3410: Merge branch 'master' into 3410-replication-attrs
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import logging
3 import os
4 import pprint
5 import sys
6 import types
7 import subprocess
8 import json
9 import UserDict
10 import re
11 import hashlib
12 import string
13 import bz2
14 import zlib
15 import fcntl
16 import time
17 import threading
18 import timer
19 import datetime
20 import ssl
21 import socket
22 import requests
23
24 import arvados
25 import arvados.config as config
26 import arvados.errors
27 import arvados.retry as retry
28 import arvados.util
29
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
32
33 class KeepLocator(object):
34     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
36
37     def __init__(self, locator_str):
38         self.hints = []
39         self._perm_sig = None
40         self._perm_expiry = None
41         pieces = iter(locator_str.split('+'))
42         self.md5sum = next(pieces)
43         try:
44             self.size = int(next(pieces))
45         except StopIteration:
46             self.size = None
47         for hint in pieces:
48             if self.HINT_RE.match(hint) is None:
49                 raise ValueError("unrecognized hint data {}".format(hint))
50             elif hint.startswith('A'):
51                 self.parse_permission_hint(hint)
52             else:
53                 self.hints.append(hint)
54
55     def __str__(self):
56         return '+'.join(
57             str(s) for s in [self.md5sum, self.size,
58                              self.permission_hint()] + self.hints
59             if s is not None)
60
61     def _make_hex_prop(name, length):
62         # Build and return a new property with the given name that
63         # must be a hex string of the given length.
64         data_name = '_{}'.format(name)
65         def getter(self):
66             return getattr(self, data_name)
67         def setter(self, hex_str):
68             if not arvados.util.is_hex(hex_str, length):
69                 raise ValueError("{} must be a {}-digit hex string: {}".
70                                  format(name, length, hex_str))
71             setattr(self, data_name, hex_str)
72         return property(getter, setter)
73
74     md5sum = _make_hex_prop('md5sum', 32)
75     perm_sig = _make_hex_prop('perm_sig', 40)
76
77     @property
78     def perm_expiry(self):
79         return self._perm_expiry
80
81     @perm_expiry.setter
82     def perm_expiry(self, value):
83         if not arvados.util.is_hex(value, 1, 8):
84             raise ValueError(
85                 "permission timestamp must be a hex Unix timestamp: {}".
86                 format(value))
87         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
88
89     def permission_hint(self):
90         data = [self.perm_sig, self.perm_expiry]
91         if None in data:
92             return None
93         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
94         return "A{}@{:08x}".format(*data)
95
96     def parse_permission_hint(self, s):
97         try:
98             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
99         except IndexError:
100             raise ValueError("bad permission hint {}".format(s))
101
102     def permission_expired(self, as_of_dt=None):
103         if self.perm_expiry is None:
104             return False
105         elif as_of_dt is None:
106             as_of_dt = datetime.datetime.now()
107         return self.perm_expiry <= as_of_dt
108
109
110 class Keep(object):
111     """Simple interface to a global KeepClient object.
112
113     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
114     own API client.  The global KeepClient will build an API client from the
115     current Arvados configuration, which may not match the one you built.
116     """
117     _last_key = None
118
119     @classmethod
120     def global_client_object(cls):
121         global global_client_object
122         # Previously, KeepClient would change its behavior at runtime based
123         # on these configuration settings.  We simulate that behavior here
124         # by checking the values and returning a new KeepClient if any of
125         # them have changed.
126         key = (config.get('ARVADOS_API_HOST'),
127                config.get('ARVADOS_API_TOKEN'),
128                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
129                config.get('ARVADOS_KEEP_PROXY'),
130                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
131                os.environ.get('KEEP_LOCAL_STORE'))
132         if (global_client_object is None) or (cls._last_key != key):
133             global_client_object = KeepClient()
134             cls._last_key = key
135         return global_client_object
136
137     @staticmethod
138     def get(locator, **kwargs):
139         return Keep.global_client_object().get(locator, **kwargs)
140
141     @staticmethod
142     def put(data, **kwargs):
143         return Keep.global_client_object().put(data, **kwargs)
144
145 class KeepBlockCache(object):
146     # Default RAM cache is 256MiB
147     def __init__(self, cache_max=(256 * 1024 * 1024)):
148         self.cache_max = cache_max
149         self._cache = []
150         self._cache_lock = threading.Lock()
151
152     class CacheSlot(object):
153         def __init__(self, locator):
154             self.locator = locator
155             self.ready = threading.Event()
156             self.content = None
157
158         def get(self):
159             self.ready.wait()
160             return self.content
161
162         def set(self, value):
163             self.content = value
164             self.ready.set()
165
166         def size(self):
167             if self.content is None:
168                 return 0
169             else:
170                 return len(self.content)
171
172     def cap_cache(self):
173         '''Cap the cache size to self.cache_max'''
174         self._cache_lock.acquire()
175         try:
176             # Select all slots except those where ready.is_set() and content is
177             # None (that means there was an error reading the block).
178             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
179             sm = sum([slot.size() for slot in self._cache])
180             while len(self._cache) > 0 and sm > self.cache_max:
181                 for i in xrange(len(self._cache)-1, -1, -1):
182                     if self._cache[i].ready.is_set():
183                         del self._cache[i]
184                         break
185                 sm = sum([slot.size() for slot in self._cache])
186         finally:
187             self._cache_lock.release()
188
189     def reserve_cache(self, locator):
190         '''Reserve a cache slot for the specified locator,
191         or return the existing slot.'''
192         self._cache_lock.acquire()
193         try:
194             # Test if the locator is already in the cache
195             for i in xrange(0, len(self._cache)):
196                 if self._cache[i].locator == locator:
197                     n = self._cache[i]
198                     if i != 0:
199                         # move it to the front
200                         del self._cache[i]
201                         self._cache.insert(0, n)
202                     return n, False
203
204             # Add a new cache slot for the locator
205             n = KeepBlockCache.CacheSlot(locator)
206             self._cache.insert(0, n)
207             return n, True
208         finally:
209             self._cache_lock.release()
210
211 class KeepClient(object):
212
213     # Default Keep server connection timeout:  2 seconds
214     # Default Keep server read timeout:      300 seconds
215     # Default Keep proxy connection timeout:  20 seconds
216     # Default Keep proxy read timeout:       300 seconds
217     DEFAULT_TIMEOUT = (2, 300)
218     DEFAULT_PROXY_TIMEOUT = (20, 300)
219
220     class ThreadLimiter(object):
221         """
222         Limit the number of threads running at a given time to
223         {desired successes} minus {successes reported}. When successes
224         reported == desired, wake up the remaining threads and tell
225         them to quit.
226
227         Should be used in a "with" block.
228         """
229         def __init__(self, todo):
230             self._todo = todo
231             self._done = 0
232             self._response = None
233             self._todo_lock = threading.Semaphore(todo)
234             self._done_lock = threading.Lock()
235
236         def __enter__(self):
237             self._todo_lock.acquire()
238             return self
239
240         def __exit__(self, type, value, traceback):
241             self._todo_lock.release()
242
243         def shall_i_proceed(self):
244             """
245             Return true if the current thread should do stuff. Return
246             false if the current thread should just stop.
247             """
248             with self._done_lock:
249                 return (self._done < self._todo)
250
251         def save_response(self, response_body, replicas_stored):
252             """
253             Records a response body (a locator, possibly signed) returned by
254             the Keep server.  It is not necessary to save more than
255             one response, since we presume that any locator returned
256             in response to a successful request is valid.
257             """
258             with self._done_lock:
259                 self._done += replicas_stored
260                 self._response = response_body
261
262         def response(self):
263             """
264             Returns the body from the response to a PUT request.
265             """
266             with self._done_lock:
267                 return self._response
268
269         def done(self):
270             """
271             Return how many successes were reported.
272             """
273             with self._done_lock:
274                 return self._done
275
276
277     class KeepService(object):
278         # Make requests to a single Keep service, and track results.
279         HTTP_ERRORS = (requests.exceptions.RequestException,
280                        socket.error, ssl.SSLError)
281
282         def __init__(self, root, **headers):
283             self.root = root
284             self.last_result = None
285             self.success_flag = None
286             self.get_headers = {'Accept': 'application/octet-stream'}
287             self.get_headers.update(headers)
288             self.put_headers = headers
289
290         def usable(self):
291             return self.success_flag is not False
292
293         def finished(self):
294             return self.success_flag is not None
295
296         def last_status(self):
297             try:
298                 return self.last_result.status_code
299             except AttributeError:
300                 return None
301
302         def get(self, locator, timeout=None):
303             # locator is a KeepLocator object.
304             url = self.root + str(locator)
305             _logger.debug("Request: GET %s", url)
306             try:
307                 with timer.Timer() as t:
308                     result = requests.get(url.encode('utf-8'),
309                                           headers=self.get_headers,
310                                           timeout=timeout)
311             except self.HTTP_ERRORS as e:
312                 _logger.debug("Request fail: GET %s => %s: %s",
313                               url, type(e), str(e))
314                 self.last_result = e
315             else:
316                 self.last_result = result
317                 self.success_flag = retry.check_http_response_success(result)
318                 content = result.content
319                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
320                              self.last_status(), len(content), t.msecs,
321                              (len(content)/(1024.0*1024))/t.secs)
322                 if self.success_flag:
323                     resp_md5 = hashlib.md5(content).hexdigest()
324                     if resp_md5 == locator.md5sum:
325                         return content
326                     _logger.warning("Checksum fail: md5(%s) = %s",
327                                     url, resp_md5)
328             return None
329
330         def put(self, hash_s, body, timeout=None):
331             url = self.root + hash_s
332             _logger.debug("Request: PUT %s", url)
333             try:
334                 result = requests.put(url.encode('utf-8'),
335                                       data=body,
336                                       headers=self.put_headers,
337                                       timeout=timeout)
338             except self.HTTP_ERRORS as e:
339                 _logger.debug("Request fail: PUT %s => %s: %s",
340                               url, type(e), str(e))
341                 self.last_result = e
342             else:
343                 self.last_result = result
344                 self.success_flag = retry.check_http_response_success(result)
345             return self.success_flag
346
347
348     class KeepWriterThread(threading.Thread):
349         """
350         Write a blob of data to the given Keep server. On success, call
351         save_response() of the given ThreadLimiter to save the returned
352         locator.
353         """
354         def __init__(self, keep_service, **kwargs):
355             super(KeepClient.KeepWriterThread, self).__init__()
356             self.service = keep_service
357             self.args = kwargs
358             self._success = False
359
360         def success(self):
361             return self._success
362
363         def run(self):
364             with self.args['thread_limiter'] as limiter:
365                 if not limiter.shall_i_proceed():
366                     # My turn arrived, but the job has been done without
367                     # me.
368                     return
369                 self.run_with_limiter(limiter)
370
371         def run_with_limiter(self, limiter):
372             if self.service.finished():
373                 return
374             _logger.debug("KeepWriterThread %s proceeding %s %s",
375                           str(threading.current_thread()),
376                           self.args['data_hash'],
377                           self.args['service_root'])
378             self._success = bool(self.service.put(
379                 self.args['data_hash'],
380                 self.args['data'],
381                 timeout=self.args.get('timeout', None)))
382             status = self.service.last_status()
383             if self._success:
384                 result = self.service.last_result
385                 _logger.debug("KeepWriterThread %s succeeded %s %s",
386                               str(threading.current_thread()),
387                               self.args['data_hash'],
388                               self.args['service_root'])
389                 # Tick the 'done' counter for the number of replica
390                 # reported stored by the server, for the case that
391                 # we're talking to a proxy or other backend that
392                 # stores to multiple copies for us.
393                 try:
394                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
395                 except (KeyError, ValueError):
396                     replicas_stored = 1
397                 limiter.save_response(result.text.strip(), replicas_stored)
398             elif status is not None:
399                 _logger.debug("Request fail: PUT %s => %s %s",
400                               self.args['data_hash'], status,
401                               self.service.last_result.text)
402
403
404     def __init__(self, api_client=None, proxy=None,
405                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
406                  api_token=None, local_store=None, block_cache=None,
407                  num_retries=0):
408         """Initialize a new KeepClient.
409
410         Arguments:
411         * api_client: The API client to use to find Keep services.  If not
412           provided, KeepClient will build one from available Arvados
413           configuration.
414         * proxy: If specified, this KeepClient will send requests to this
415           Keep proxy.  Otherwise, KeepClient will fall back to the setting
416           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
417           ensure KeepClient does not use a proxy, pass in an empty string.
418         * timeout: The timeout (in seconds) for HTTP requests to Keep
419           non-proxy servers.  A tuple of two floats is interpreted as
420           (connection_timeout, read_timeout): see
421           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
422           Default: (2, 300).
423         * proxy_timeout: The timeout (in seconds) for HTTP requests to
424           Keep proxies. A tuple of two floats is interpreted as
425           (connection_timeout, read_timeout). Default: (20, 300).
426         * api_token: If you're not using an API client, but only talking
427           directly to a Keep proxy, this parameter specifies an API token
428           to authenticate Keep requests.  It is an error to specify both
429           api_client and api_token.  If you specify neither, KeepClient
430           will use one available from the Arvados configuration.
431         * local_store: If specified, this KeepClient will bypass Keep
432           services, and save data to the named directory.  If unspecified,
433           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
434           environment variable.  If you want to ensure KeepClient does not
435           use local storage, pass in an empty string.  This is primarily
436           intended to mock a server for testing.
437         * num_retries: The default number of times to retry failed requests.
438           This will be used as the default num_retries value when get() and
439           put() are called.  Default 0.
440         """
441         self.lock = threading.Lock()
442         if proxy is None:
443             proxy = config.get('ARVADOS_KEEP_PROXY')
444         if api_token is None:
445             if api_client is None:
446                 api_token = config.get('ARVADOS_API_TOKEN')
447             else:
448                 api_token = api_client.api_token
449         elif api_client is not None:
450             raise ValueError(
451                 "can't build KeepClient with both API client and token")
452         if local_store is None:
453             local_store = os.environ.get('KEEP_LOCAL_STORE')
454
455         self.block_cache = block_cache if block_cache else KeepBlockCache()
456         self.timeout = timeout
457         self.proxy_timeout = proxy_timeout
458
459         if local_store:
460             self.local_store = local_store
461             self.get = self.local_store_get
462             self.put = self.local_store_put
463         else:
464             self.num_retries = num_retries
465             if proxy:
466                 if not proxy.endswith('/'):
467                     proxy += '/'
468                 self.api_token = api_token
469                 self._keep_services = [{
470                     'uuid': 'proxy',
471                     '_service_root': proxy,
472                     }]
473                 self.using_proxy = True
474                 self._static_services_list = True
475             else:
476                 # It's important to avoid instantiating an API client
477                 # unless we actually need one, for testing's sake.
478                 if api_client is None:
479                     api_client = arvados.api('v1')
480                 self.api_client = api_client
481                 self.api_token = api_client.api_token
482                 self._keep_services = None
483                 self.using_proxy = None
484                 self._static_services_list = False
485
486     def current_timeout(self):
487         """Return the appropriate timeout to use for this client: the proxy
488         timeout setting if the backend service is currently a proxy,
489         the regular timeout setting otherwise.
490         """
491         # TODO(twp): the timeout should be a property of a
492         # KeepService, not a KeepClient. See #4488.
493         return self.proxy_timeout if self.using_proxy else self.timeout
494
495     def build_services_list(self, force_rebuild=False):
496         if (self._static_services_list or
497               (self._keep_services and not force_rebuild)):
498             return
499         with self.lock:
500             try:
501                 keep_services = self.api_client.keep_services().accessible()
502             except Exception:  # API server predates Keep services.
503                 keep_services = self.api_client.keep_disks().list()
504
505             self._keep_services = keep_services.execute().get('items')
506             if not self._keep_services:
507                 raise arvados.errors.NoKeepServersError()
508
509             self.using_proxy = any(ks.get('service_type') == 'proxy'
510                                    for ks in self._keep_services)
511
512             # Precompute the base URI for each service.
513             for r in self._keep_services:
514                 r['_service_root'] = "{}://[{}]:{:d}/".format(
515                     'https' if r['service_ssl_flag'] else 'http',
516                     r['service_host'],
517                     r['service_port'])
518             _logger.debug(str(self._keep_services))
519
520     def _service_weight(self, data_hash, service_uuid):
521         """Compute the weight of a Keep service endpoint for a data
522         block with a known hash.
523
524         The weight is md5(h + u) where u is the last 15 characters of
525         the service endpoint's UUID.
526         """
527         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
528
529     def weighted_service_roots(self, data_hash, force_rebuild=False):
530         """Return an array of Keep service endpoints, in the order in
531         which they should be probed when reading or writing data with
532         the given hash.
533         """
534         self.build_services_list(force_rebuild)
535
536         # Sort the available services by weight (heaviest first) for
537         # this data_hash, and return their service_roots (base URIs)
538         # in that order.
539         sorted_roots = [
540             svc['_service_root'] for svc in sorted(
541                 self._keep_services,
542                 reverse=True,
543                 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
544         _logger.debug(data_hash + ': ' + str(sorted_roots))
545         return sorted_roots
546
547     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
548         # roots_map is a dictionary, mapping Keep service root strings
549         # to KeepService objects.  Poll for Keep services, and add any
550         # new ones to roots_map.  Return the current list of local
551         # root strings.
552         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
553         local_roots = self.weighted_service_roots(md5_s, force_rebuild)
554         for root in local_roots:
555             if root not in roots_map:
556                 roots_map[root] = self.KeepService(root, **headers)
557         return local_roots
558
559     @staticmethod
560     def _check_loop_result(result):
561         # KeepClient RetryLoops should save results as a 2-tuple: the
562         # actual result of the request, and the number of servers available
563         # to receive the request this round.
564         # This method returns True if there's a real result, False if
565         # there are no more servers available, otherwise None.
566         if isinstance(result, Exception):
567             return None
568         result, tried_server_count = result
569         if (result is not None) and (result is not False):
570             return True
571         elif tried_server_count < 1:
572             _logger.info("No more Keep services to try; giving up")
573             return False
574         else:
575             return None
576
577     @retry.retry_method
578     def get(self, loc_s, num_retries=None):
579         """Get data from Keep.
580
581         This method fetches one or more blocks of data from Keep.  It
582         sends a request each Keep service registered with the API
583         server (or the proxy provided when this client was
584         instantiated), then each service named in location hints, in
585         sequence.  As soon as one service provides the data, it's
586         returned.
587
588         Arguments:
589         * loc_s: A string of one or more comma-separated locators to fetch.
590           This method returns the concatenation of these blocks.
591         * num_retries: The number of times to retry GET requests to
592           *each* Keep server if it returns temporary failures, with
593           exponential backoff.  Note that, in each loop, the method may try
594           to fetch data from every available Keep service, along with any
595           that are named in location hints in the locator.  The default value
596           is set when the KeepClient is initialized.
597         """
598         if ',' in loc_s:
599             return ''.join(self.get(x) for x in loc_s.split(','))
600         locator = KeepLocator(loc_s)
601         expect_hash = locator.md5sum
602
603         slot, first = self.block_cache.reserve_cache(expect_hash)
604         if not first:
605             v = slot.get()
606             return v
607
608         # See #3147 for a discussion of the loop implementation.  Highlights:
609         # * Refresh the list of Keep services after each failure, in case
610         #   it's being updated.
611         # * Retry until we succeed, we're out of retries, or every available
612         #   service has returned permanent failure.
613         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
614                       for hint in locator.hints if hint.startswith('K@')]
615         # Map root URLs their KeepService objects.
616         roots_map = {root: self.KeepService(root) for root in hint_roots}
617         blob = None
618         loop = retry.RetryLoop(num_retries, self._check_loop_result,
619                                backoff_start=2)
620         for tries_left in loop:
621             try:
622                 local_roots = self.map_new_services(
623                     roots_map, expect_hash,
624                     force_rebuild=(tries_left < num_retries))
625             except Exception as error:
626                 loop.save_result(error)
627                 continue
628
629             # Query KeepService objects that haven't returned
630             # permanent failure, in our specified shuffle order.
631             services_to_try = [roots_map[root]
632                                for root in (local_roots + hint_roots)
633                                if roots_map[root].usable()]
634             for keep_service in services_to_try:
635                 blob = keep_service.get(locator, timeout=self.current_timeout())
636                 if blob is not None:
637                     break
638             loop.save_result((blob, len(services_to_try)))
639
640         # Always cache the result, then return it if we succeeded.
641         slot.set(blob)
642         self.block_cache.cap_cache()
643         if loop.success():
644             return blob
645
646         try:
647             all_roots = local_roots + hint_roots
648         except NameError:
649             # We never successfully fetched local_roots.
650             all_roots = hint_roots
651         # Q: Including 403 is necessary for the Keep tests to continue
652         # passing, but maybe they should expect KeepReadError instead?
653         not_founds = sum(1 for key in all_roots
654                          if roots_map[key].last_status() in {403, 404, 410})
655         service_errors = ((key, roots_map[key].last_result)
656                           for key in all_roots)
657         if not roots_map:
658             raise arvados.errors.KeepReadError(
659                 "failed to read {}: no Keep services available ({})".format(
660                     loc_s, loop.last_result()))
661         elif not_founds == len(all_roots):
662             raise arvados.errors.NotFoundError(
663                 "{} not found".format(loc_s), service_errors)
664         else:
665             raise arvados.errors.KeepReadError(
666                 "failed to read {}".format(loc_s), service_errors)
667
668     @retry.retry_method
669     def put(self, data, copies=2, num_retries=None):
670         """Save data in Keep.
671
672         This method will get a list of Keep services from the API server, and
673         send the data to each one simultaneously in a new thread.  Once the
674         uploads are finished, if enough copies are saved, this method returns
675         the most recent HTTP response body.  If requests fail to upload
676         enough copies, this method raises KeepWriteError.
677
678         Arguments:
679         * data: The string of data to upload.
680         * copies: The number of copies that the user requires be saved.
681           Default 2.
682         * num_retries: The number of times to retry PUT requests to
683           *each* Keep server if it returns temporary failures, with
684           exponential backoff.  The default value is set when the
685           KeepClient is initialized.
686         """
687         data_hash = hashlib.md5(data).hexdigest()
688         if copies < 1:
689             return data_hash
690
691         headers = {}
692         if self.using_proxy:
693             # Tell the proxy how many copies we want it to store
694             headers['X-Keep-Desired-Replication'] = str(copies)
695         roots_map = {}
696         thread_limiter = KeepClient.ThreadLimiter(copies)
697         loop = retry.RetryLoop(num_retries, self._check_loop_result,
698                                backoff_start=2)
699         for tries_left in loop:
700             try:
701                 local_roots = self.map_new_services(
702                     roots_map, data_hash,
703                     force_rebuild=(tries_left < num_retries), **headers)
704             except Exception as error:
705                 loop.save_result(error)
706                 continue
707
708             threads = []
709             for service_root, ks in roots_map.iteritems():
710                 if ks.finished():
711                     continue
712                 t = KeepClient.KeepWriterThread(
713                     ks,
714                     data=data,
715                     data_hash=data_hash,
716                     service_root=service_root,
717                     thread_limiter=thread_limiter,
718                     timeout=self.current_timeout())
719                 t.start()
720                 threads.append(t)
721             for t in threads:
722                 t.join()
723             loop.save_result((thread_limiter.done() >= copies, len(threads)))
724
725         if loop.success():
726             return thread_limiter.response()
727         if not roots_map:
728             raise arvados.errors.KeepWriteError(
729                 "failed to write {}: no Keep services available ({})".format(
730                     data_hash, loop.last_result()))
731         else:
732             service_errors = ((key, roots_map[key].last_result)
733                               for key in local_roots
734                               if not roots_map[key].success_flag)
735             raise arvados.errors.KeepWriteError(
736                 "failed to write {} (wanted {} copies but wrote {})".format(
737                     data_hash, copies, thread_limiter.done()), service_errors)
738
739     def local_store_put(self, data, copies=1, num_retries=None):
740         """A stub for put().
741
742         This method is used in place of the real put() method when
743         using local storage (see constructor's local_store argument).
744
745         copies and num_retries arguments are ignored: they are here
746         only for the sake of offering the same call signature as
747         put().
748
749         Data stored this way can be retrieved via local_store_get().
750         """
751         md5 = hashlib.md5(data).hexdigest()
752         locator = '%s+%d' % (md5, len(data))
753         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
754             f.write(data)
755         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
756                   os.path.join(self.local_store, md5))
757         return locator
758
759     def local_store_get(self, loc_s, num_retries=None):
760         """Companion to local_store_put()."""
761         try:
762             locator = KeepLocator(loc_s)
763         except ValueError:
764             raise arvados.errors.NotFoundError(
765                 "Invalid data locator: '%s'" % loc_s)
766         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
767             return ''
768         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
769             return f.read()