3198: KeepClient creates a requests session to re-use connections. export_manifest...
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import logging
3 import os
4 import pprint
5 import sys
6 import types
7 import subprocess
8 import json
9 import UserDict
10 import re
11 import hashlib
12 import string
13 import bz2
14 import zlib
15 import fcntl
16 import time
17 import threading
18 import timer
19 import datetime
20 import ssl
21 import socket
22 import requests
23
24 import arvados
25 import arvados.config as config
26 import arvados.errors
27 import arvados.retry as retry
28 import arvados.util
29
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
32
33 class KeepLocator(object):
34     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
36
37     def __init__(self, locator_str):
38         self.hints = []
39         self._perm_sig = None
40         self._perm_expiry = None
41         pieces = iter(locator_str.split('+'))
42         self.md5sum = next(pieces)
43         try:
44             self.size = int(next(pieces))
45         except StopIteration:
46             self.size = None
47         for hint in pieces:
48             if self.HINT_RE.match(hint) is None:
49                 raise ValueError("unrecognized hint data {}".format(hint))
50             elif hint.startswith('A'):
51                 self.parse_permission_hint(hint)
52             else:
53                 self.hints.append(hint)
54
55     def __str__(self):
56         return '+'.join(
57             str(s) for s in [self.md5sum, self.size,
58                              self.permission_hint()] + self.hints
59             if s is not None)
60
61     def stripped(self):
62         return "%s+%i" % (self.md5sum, self.size)
63
64     def _make_hex_prop(name, length):
65         # Build and return a new property with the given name that
66         # must be a hex string of the given length.
67         data_name = '_{}'.format(name)
68         def getter(self):
69             return getattr(self, data_name)
70         def setter(self, hex_str):
71             if not arvados.util.is_hex(hex_str, length):
72                 raise ValueError("{} must be a {}-digit hex string: {}".
73                                  format(name, length, hex_str))
74             setattr(self, data_name, hex_str)
75         return property(getter, setter)
76
77     md5sum = _make_hex_prop('md5sum', 32)
78     perm_sig = _make_hex_prop('perm_sig', 40)
79
80     @property
81     def perm_expiry(self):
82         return self._perm_expiry
83
84     @perm_expiry.setter
85     def perm_expiry(self, value):
86         if not arvados.util.is_hex(value, 1, 8):
87             raise ValueError(
88                 "permission timestamp must be a hex Unix timestamp: {}".
89                 format(value))
90         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
91
92     def permission_hint(self):
93         data = [self.perm_sig, self.perm_expiry]
94         if None in data:
95             return None
96         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
97         return "A{}@{:08x}".format(*data)
98
99     def parse_permission_hint(self, s):
100         try:
101             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
102         except IndexError:
103             raise ValueError("bad permission hint {}".format(s))
104
105     def permission_expired(self, as_of_dt=None):
106         if self.perm_expiry is None:
107             return False
108         elif as_of_dt is None:
109             as_of_dt = datetime.datetime.now()
110         return self.perm_expiry <= as_of_dt
111
112
113 class Keep(object):
114     """Simple interface to a global KeepClient object.
115
116     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
117     own API client.  The global KeepClient will build an API client from the
118     current Arvados configuration, which may not match the one you built.
119     """
120     _last_key = None
121
122     @classmethod
123     def global_client_object(cls):
124         global global_client_object
125         # Previously, KeepClient would change its behavior at runtime based
126         # on these configuration settings.  We simulate that behavior here
127         # by checking the values and returning a new KeepClient if any of
128         # them have changed.
129         key = (config.get('ARVADOS_API_HOST'),
130                config.get('ARVADOS_API_TOKEN'),
131                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
132                config.get('ARVADOS_KEEP_PROXY'),
133                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
134                os.environ.get('KEEP_LOCAL_STORE'))
135         if (global_client_object is None) or (cls._last_key != key):
136             global_client_object = KeepClient()
137             cls._last_key = key
138         return global_client_object
139
140     @staticmethod
141     def get(locator, **kwargs):
142         return Keep.global_client_object().get(locator, **kwargs)
143
144     @staticmethod
145     def put(data, **kwargs):
146         return Keep.global_client_object().put(data, **kwargs)
147
148 class KeepBlockCache(object):
149     # Default RAM cache is 256MiB
150     def __init__(self, cache_max=(256 * 1024 * 1024)):
151         self.cache_max = cache_max
152         self._cache = []
153         self._cache_lock = threading.Lock()
154
155     class CacheSlot(object):
156         def __init__(self, locator):
157             self.locator = locator
158             self.ready = threading.Event()
159             self.content = None
160
161         def get(self):
162             self.ready.wait()
163             return self.content
164
165         def set(self, value):
166             self.content = value
167             self.ready.set()
168
169         def size(self):
170             if self.content is None:
171                 return 0
172             else:
173                 return len(self.content)
174
175     def cap_cache(self):
176         '''Cap the cache size to self.cache_max'''
177         with self._cache_lock:
178             # Select all slots except those where ready.is_set() and content is
179             # None (that means there was an error reading the block).
180             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
181             sm = sum([slot.size() for slot in self._cache])
182             while len(self._cache) > 0 and sm > self.cache_max:
183                 for i in xrange(len(self._cache)-1, -1, -1):
184                     if self._cache[i].ready.is_set():
185                         del self._cache[i]
186                         break
187                 sm = sum([slot.size() for slot in self._cache])
188
189     def reserve_cache(self, locator):
190         '''Reserve a cache slot for the specified locator,
191         or return the existing slot.'''
192         with self._cache_lock:
193             # Test if the locator is already in the cache
194             for i in xrange(0, len(self._cache)):
195                 if self._cache[i].locator == locator:
196                     n = self._cache[i]
197                     if i != 0:
198                         # move it to the front
199                         del self._cache[i]
200                         self._cache.insert(0, n)
201                     return n, False
202
203             # Add a new cache slot for the locator
204             n = KeepBlockCache.CacheSlot(locator)
205             self._cache.insert(0, n)
206             return n, True
207
208 class KeepClient(object):
209
210     # Default Keep server connection timeout:  2 seconds
211     # Default Keep server read timeout:      300 seconds
212     # Default Keep proxy connection timeout:  20 seconds
213     # Default Keep proxy read timeout:       300 seconds
214     DEFAULT_TIMEOUT = (2, 300)
215     DEFAULT_PROXY_TIMEOUT = (20, 300)
216
217     class ThreadLimiter(object):
218         """
219         Limit the number of threads running at a given time to
220         {desired successes} minus {successes reported}. When successes
221         reported == desired, wake up the remaining threads and tell
222         them to quit.
223
224         Should be used in a "with" block.
225         """
226         def __init__(self, todo):
227             self._todo = todo
228             self._done = 0
229             self._response = None
230             self._todo_lock = threading.Semaphore(todo)
231             self._done_lock = threading.Lock()
232
233         def __enter__(self):
234             self._todo_lock.acquire()
235             return self
236
237         def __exit__(self, type, value, traceback):
238             self._todo_lock.release()
239
240         def shall_i_proceed(self):
241             """
242             Return true if the current thread should do stuff. Return
243             false if the current thread should just stop.
244             """
245             with self._done_lock:
246                 return (self._done < self._todo)
247
248         def save_response(self, response_body, replicas_stored):
249             """
250             Records a response body (a locator, possibly signed) returned by
251             the Keep server.  It is not necessary to save more than
252             one response, since we presume that any locator returned
253             in response to a successful request is valid.
254             """
255             with self._done_lock:
256                 self._done += replicas_stored
257                 self._response = response_body
258
259         def response(self):
260             """
261             Returns the body from the response to a PUT request.
262             """
263             with self._done_lock:
264                 return self._response
265
266         def done(self):
267             """
268             Return how many successes were reported.
269             """
270             with self._done_lock:
271                 return self._done
272
273
274     class KeepService(object):
275         # Make requests to a single Keep service, and track results.
276         HTTP_ERRORS = (requests.exceptions.RequestException,
277                        socket.error, ssl.SSLError)
278
279         def __init__(self, root, session, **headers):
280             self.root = root
281             self.last_result = None
282             self.success_flag = None
283             self.session = session
284             self.get_headers = {'Accept': 'application/octet-stream'}
285             self.get_headers.update(headers)
286             self.put_headers = headers
287
288         def usable(self):
289             return self.success_flag is not False
290
291         def finished(self):
292             return self.success_flag is not None
293
294         def last_status(self):
295             try:
296                 return self.last_result.status_code
297             except AttributeError:
298                 return None
299
300         def get(self, locator, timeout=None):
301             # locator is a KeepLocator object.
302             url = self.root + str(locator)
303             _logger.debug("Request: GET %s", url)
304             try:
305                 with timer.Timer() as t:
306                     result = self.session.get(url.encode('utf-8'),
307                                           headers=self.get_headers,
308                                           timeout=timeout)
309             except self.HTTP_ERRORS as e:
310                 _logger.debug("Request fail: GET %s => %s: %s",
311                               url, type(e), str(e))
312                 self.last_result = e
313             else:
314                 self.last_result = result
315                 self.success_flag = retry.check_http_response_success(result)
316                 content = result.content
317                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
318                              self.last_status(), len(content), t.msecs,
319                              (len(content)/(1024.0*1024))/t.secs)
320                 if self.success_flag:
321                     resp_md5 = hashlib.md5(content).hexdigest()
322                     if resp_md5 == locator.md5sum:
323                         return content
324                     _logger.warning("Checksum fail: md5(%s) = %s",
325                                     url, resp_md5)
326             return None
327
328         def put(self, hash_s, body, timeout=None):
329             url = self.root + hash_s
330             _logger.debug("Request: PUT %s", url)
331             try:
332                 result = self.session.put(url.encode('utf-8'),
333                                       data=body,
334                                       headers=self.put_headers,
335                                       timeout=timeout)
336             except self.HTTP_ERRORS as e:
337                 _logger.debug("Request fail: PUT %s => %s: %s",
338                               url, type(e), str(e))
339                 self.last_result = e
340             else:
341                 self.last_result = result
342                 self.success_flag = retry.check_http_response_success(result)
343             return self.success_flag
344
345
346     class KeepWriterThread(threading.Thread):
347         """
348         Write a blob of data to the given Keep server. On success, call
349         save_response() of the given ThreadLimiter to save the returned
350         locator.
351         """
352         def __init__(self, keep_service, **kwargs):
353             super(KeepClient.KeepWriterThread, self).__init__()
354             self.service = keep_service
355             self.args = kwargs
356             self._success = False
357
358         def success(self):
359             return self._success
360
361         def run(self):
362             with self.args['thread_limiter'] as limiter:
363                 if not limiter.shall_i_proceed():
364                     # My turn arrived, but the job has been done without
365                     # me.
366                     return
367                 self.run_with_limiter(limiter)
368
369         def run_with_limiter(self, limiter):
370             if self.service.finished():
371                 return
372             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
373                           str(threading.current_thread()),
374                           self.args['data_hash'],
375                           len(self.args['data']),
376                           self.args['service_root'])
377             self._success = bool(self.service.put(
378                 self.args['data_hash'],
379                 self.args['data'],
380                 timeout=self.args.get('timeout', None)))
381             status = self.service.last_status()
382             if self._success:
383                 result = self.service.last_result
384                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
385                               str(threading.current_thread()),
386                               self.args['data_hash'],
387                               len(self.args['data']),
388                               self.args['service_root'])
389                 # Tick the 'done' counter for the number of replica
390                 # reported stored by the server, for the case that
391                 # we're talking to a proxy or other backend that
392                 # stores to multiple copies for us.
393                 try:
394                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
395                 except (KeyError, ValueError):
396                     replicas_stored = 1
397                 limiter.save_response(result.text.strip(), replicas_stored)
398             elif status is not None:
399                 _logger.debug("Request fail: PUT %s => %s %s",
400                               self.args['data_hash'], status,
401                               self.service.last_result.text)
402
403
404     def __init__(self, api_client=None, proxy=None,
405                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
406                  api_token=None, local_store=None, block_cache=None,
407                  num_retries=0):
408         """Initialize a new KeepClient.
409
410         Arguments:
411         * api_client: The API client to use to find Keep services.  If not
412           provided, KeepClient will build one from available Arvados
413           configuration.
414         * proxy: If specified, this KeepClient will send requests to this
415           Keep proxy.  Otherwise, KeepClient will fall back to the setting
416           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
417           ensure KeepClient does not use a proxy, pass in an empty string.
418         * timeout: The timeout (in seconds) for HTTP requests to Keep
419           non-proxy servers.  A tuple of two floats is interpreted as
420           (connection_timeout, read_timeout): see
421           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
422           Default: (2, 300).
423         * proxy_timeout: The timeout (in seconds) for HTTP requests to
424           Keep proxies. A tuple of two floats is interpreted as
425           (connection_timeout, read_timeout). Default: (20, 300).
426         * api_token: If you're not using an API client, but only talking
427           directly to a Keep proxy, this parameter specifies an API token
428           to authenticate Keep requests.  It is an error to specify both
429           api_client and api_token.  If you specify neither, KeepClient
430           will use one available from the Arvados configuration.
431         * local_store: If specified, this KeepClient will bypass Keep
432           services, and save data to the named directory.  If unspecified,
433           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
434           environment variable.  If you want to ensure KeepClient does not
435           use local storage, pass in an empty string.  This is primarily
436           intended to mock a server for testing.
437         * num_retries: The default number of times to retry failed requests.
438           This will be used as the default num_retries value when get() and
439           put() are called.  Default 0.
440         """
441         self.lock = threading.Lock()
442         if proxy is None:
443             proxy = config.get('ARVADOS_KEEP_PROXY')
444         if api_token is None:
445             if api_client is None:
446                 api_token = config.get('ARVADOS_API_TOKEN')
447             else:
448                 api_token = api_client.api_token
449         elif api_client is not None:
450             raise ValueError(
451                 "can't build KeepClient with both API client and token")
452         if local_store is None:
453             local_store = os.environ.get('KEEP_LOCAL_STORE')
454
455         self.block_cache = block_cache if block_cache else KeepBlockCache()
456         self.timeout = timeout
457         self.proxy_timeout = proxy_timeout
458
459         if local_store:
460             self.local_store = local_store
461             self.get = self.local_store_get
462             self.put = self.local_store_put
463         else:
464             self.num_retries = num_retries
465             self.session = requests.Session()
466             if proxy:
467                 if not proxy.endswith('/'):
468                     proxy += '/'
469                 self.api_token = api_token
470                 self._keep_services = [{
471                     'uuid': 'proxy',
472                     '_service_root': proxy,
473                     }]
474                 self.using_proxy = True
475                 self._static_services_list = True
476             else:
477                 # It's important to avoid instantiating an API client
478                 # unless we actually need one, for testing's sake.
479                 if api_client is None:
480                     api_client = arvados.api('v1')
481                 self.api_client = api_client
482                 self.api_token = api_client.api_token
483                 self._keep_services = None
484                 self.using_proxy = None
485                 self._static_services_list = False
486
487     def current_timeout(self):
488         """Return the appropriate timeout to use for this client: the proxy
489         timeout setting if the backend service is currently a proxy,
490         the regular timeout setting otherwise.
491         """
492         # TODO(twp): the timeout should be a property of a
493         # KeepService, not a KeepClient. See #4488.
494         return self.proxy_timeout if self.using_proxy else self.timeout
495
496     def build_services_list(self, force_rebuild=False):
497         if (self._static_services_list or
498               (self._keep_services and not force_rebuild)):
499             return
500         with self.lock:
501             try:
502                 keep_services = self.api_client.keep_services().accessible()
503             except Exception:  # API server predates Keep services.
504                 keep_services = self.api_client.keep_disks().list()
505
506             self._keep_services = keep_services.execute().get('items')
507             if not self._keep_services:
508                 raise arvados.errors.NoKeepServersError()
509
510             self.using_proxy = any(ks.get('service_type') == 'proxy'
511                                    for ks in self._keep_services)
512
513             # Precompute the base URI for each service.
514             for r in self._keep_services:
515                 r['_service_root'] = "{}://[{}]:{:d}/".format(
516                     'https' if r['service_ssl_flag'] else 'http',
517                     r['service_host'],
518                     r['service_port'])
519             _logger.debug(str(self._keep_services))
520
521     def _service_weight(self, data_hash, service_uuid):
522         """Compute the weight of a Keep service endpoint for a data
523         block with a known hash.
524
525         The weight is md5(h + u) where u is the last 15 characters of
526         the service endpoint's UUID.
527         """
528         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
529
530     def weighted_service_roots(self, data_hash, force_rebuild=False):
531         """Return an array of Keep service endpoints, in the order in
532         which they should be probed when reading or writing data with
533         the given hash.
534         """
535         self.build_services_list(force_rebuild)
536
537         # Sort the available services by weight (heaviest first) for
538         # this data_hash, and return their service_roots (base URIs)
539         # in that order.
540         sorted_roots = [
541             svc['_service_root'] for svc in sorted(
542                 self._keep_services,
543                 reverse=True,
544                 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
545         _logger.debug(data_hash + ': ' + str(sorted_roots))
546         return sorted_roots
547
548     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
549         # roots_map is a dictionary, mapping Keep service root strings
550         # to KeepService objects.  Poll for Keep services, and add any
551         # new ones to roots_map.  Return the current list of local
552         # root strings.
553         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
554         local_roots = self.weighted_service_roots(md5_s, force_rebuild)
555         for root in local_roots:
556             if root not in roots_map:
557                 roots_map[root] = self.KeepService(root, self.session, **headers)
558         return local_roots
559
560     @staticmethod
561     def _check_loop_result(result):
562         # KeepClient RetryLoops should save results as a 2-tuple: the
563         # actual result of the request, and the number of servers available
564         # to receive the request this round.
565         # This method returns True if there's a real result, False if
566         # there are no more servers available, otherwise None.
567         if isinstance(result, Exception):
568             return None
569         result, tried_server_count = result
570         if (result is not None) and (result is not False):
571             return True
572         elif tried_server_count < 1:
573             _logger.info("No more Keep services to try; giving up")
574             return False
575         else:
576             return None
577
578     @retry.retry_method
579     def get(self, loc_s, num_retries=None):
580         """Get data from Keep.
581
582         This method fetches one or more blocks of data from Keep.  It
583         sends a request each Keep service registered with the API
584         server (or the proxy provided when this client was
585         instantiated), then each service named in location hints, in
586         sequence.  As soon as one service provides the data, it's
587         returned.
588
589         Arguments:
590         * loc_s: A string of one or more comma-separated locators to fetch.
591           This method returns the concatenation of these blocks.
592         * num_retries: The number of times to retry GET requests to
593           *each* Keep server if it returns temporary failures, with
594           exponential backoff.  Note that, in each loop, the method may try
595           to fetch data from every available Keep service, along with any
596           that are named in location hints in the locator.  The default value
597           is set when the KeepClient is initialized.
598         """
599         if ',' in loc_s:
600             return ''.join(self.get(x) for x in loc_s.split(','))
601         locator = KeepLocator(loc_s)
602         expect_hash = locator.md5sum
603
604         slot, first = self.block_cache.reserve_cache(expect_hash)
605         if not first:
606             v = slot.get()
607             return v
608
609         # See #3147 for a discussion of the loop implementation.  Highlights:
610         # * Refresh the list of Keep services after each failure, in case
611         #   it's being updated.
612         # * Retry until we succeed, we're out of retries, or every available
613         #   service has returned permanent failure.
614         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
615                       for hint in locator.hints if hint.startswith('K@')]
616         # Map root URLs their KeepService objects.
617         roots_map = {root: self.KeepService(root) for root in hint_roots}
618         blob = None
619         loop = retry.RetryLoop(num_retries, self._check_loop_result,
620                                backoff_start=2)
621         for tries_left in loop:
622             try:
623                 local_roots = self.map_new_services(
624                     roots_map, expect_hash,
625                     force_rebuild=(tries_left < num_retries))
626             except Exception as error:
627                 loop.save_result(error)
628                 continue
629
630             # Query KeepService objects that haven't returned
631             # permanent failure, in our specified shuffle order.
632             services_to_try = [roots_map[root]
633                                for root in (local_roots + hint_roots)
634                                if roots_map[root].usable()]
635             for keep_service in services_to_try:
636                 blob = keep_service.get(locator, timeout=self.current_timeout())
637                 if blob is not None:
638                     break
639             loop.save_result((blob, len(services_to_try)))
640
641         # Always cache the result, then return it if we succeeded.
642         slot.set(blob)
643         self.block_cache.cap_cache()
644         if loop.success():
645             return blob
646
647         # No servers fulfilled the request.  Count how many responded
648         # "not found;" if the ratio is high enough (currently 75%), report
649         # Not Found; otherwise a generic error.
650         # Q: Including 403 is necessary for the Keep tests to continue
651         # passing, but maybe they should expect KeepReadError instead?
652         not_founds = sum(1 for ks in roots_map.values()
653                          if ks.last_status() in set([403, 404, 410]))
654         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
655             raise arvados.errors.NotFoundError(loc_s)
656         else:
657             raise arvados.errors.KeepReadError(loc_s)
658
659     @retry.retry_method
660     def put(self, data, copies=2, num_retries=None):
661         """Save data in Keep.
662
663         This method will get a list of Keep services from the API server, and
664         send the data to each one simultaneously in a new thread.  Once the
665         uploads are finished, if enough copies are saved, this method returns
666         the most recent HTTP response body.  If requests fail to upload
667         enough copies, this method raises KeepWriteError.
668
669         Arguments:
670         * data: The string of data to upload.
671         * copies: The number of copies that the user requires be saved.
672           Default 2.
673         * num_retries: The number of times to retry PUT requests to
674           *each* Keep server if it returns temporary failures, with
675           exponential backoff.  The default value is set when the
676           KeepClient is initialized.
677         """
678         data_hash = hashlib.md5(data).hexdigest()
679         if copies < 1:
680             return data_hash
681
682         headers = {}
683         if self.using_proxy:
684             # Tell the proxy how many copies we want it to store
685             headers['X-Keep-Desired-Replication'] = str(copies)
686         roots_map = {}
687         thread_limiter = KeepClient.ThreadLimiter(copies)
688         loop = retry.RetryLoop(num_retries, self._check_loop_result,
689                                backoff_start=2)
690         for tries_left in loop:
691             try:
692                 local_roots = self.map_new_services(
693                     roots_map, data_hash,
694                     force_rebuild=(tries_left < num_retries), **headers)
695             except Exception as error:
696                 loop.save_result(error)
697                 continue
698
699             threads = []
700             for service_root, ks in roots_map.iteritems():
701                 if ks.finished():
702                     continue
703                 t = KeepClient.KeepWriterThread(
704                     ks,
705                     data=data,
706                     data_hash=data_hash,
707                     service_root=service_root,
708                     thread_limiter=thread_limiter,
709                     timeout=self.current_timeout())
710                 t.start()
711                 threads.append(t)
712             for t in threads:
713                 t.join()
714             loop.save_result((thread_limiter.done() >= copies, len(threads)))
715
716         if loop.success():
717             return thread_limiter.response()
718         raise arvados.errors.KeepWriteError(
719             "Write fail for %s: wanted %d but wrote %d" %
720             (data_hash, copies, thread_limiter.done()))
721
722     # Local storage methods need no-op num_retries arguments to keep
723     # integration tests happy.  With better isolation they could
724     # probably be removed again.
725     def local_store_put(self, data, num_retries=0):
726         md5 = hashlib.md5(data).hexdigest()
727         locator = '%s+%d' % (md5, len(data))
728         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
729             f.write(data)
730         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
731                   os.path.join(self.local_store, md5))
732         return locator
733
734     def local_store_get(self, loc_s, num_retries=0):
735         try:
736             locator = KeepLocator(loc_s)
737         except ValueError:
738             raise arvados.errors.NotFoundError(
739                 "Invalid data locator: '%s'" % loc_s)
740         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
741             return ''
742         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
743             return f.read()