4823: More fixes and cleanups.
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import logging
3 import os
4 import pprint
5 import sys
6 import types
7 import subprocess
8 import json
9 import UserDict
10 import re
11 import hashlib
12 import string
13 import bz2
14 import zlib
15 import fcntl
16 import time
17 import threading
18 import timer
19 import datetime
20 import ssl
21 import socket
22 import requests
23
24 import arvados
25 import arvados.config as config
26 import arvados.errors
27 import arvados.retry as retry
28 import arvados.util
29
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
32
33 class KeepLocator(object):
34     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
36
37     def __init__(self, locator_str):
38         self.hints = []
39         self._perm_sig = None
40         self._perm_expiry = None
41         pieces = iter(locator_str.split('+'))
42         self.md5sum = next(pieces)
43         try:
44             self.size = int(next(pieces))
45         except StopIteration:
46             self.size = None
47         for hint in pieces:
48             if self.HINT_RE.match(hint) is None:
49                 raise ValueError("unrecognized hint data {}".format(hint))
50             elif hint.startswith('A'):
51                 self.parse_permission_hint(hint)
52             else:
53                 self.hints.append(hint)
54
55     def __str__(self):
56         return '+'.join(
57             str(s) for s in [self.md5sum, self.size,
58                              self.permission_hint()] + self.hints
59             if s is not None)
60
61     def stripped(self):
62         if self.size is not None:
63             return "%s+%i" % (self.md5sum, self.size)
64         else:
65             return self.md5sum
66
67     def _make_hex_prop(name, length):
68         # Build and return a new property with the given name that
69         # must be a hex string of the given length.
70         data_name = '_{}'.format(name)
71         def getter(self):
72             return getattr(self, data_name)
73         def setter(self, hex_str):
74             if not arvados.util.is_hex(hex_str, length):
75                 raise ValueError("{} must be a {}-digit hex string: {}".
76                                  format(name, length, hex_str))
77             setattr(self, data_name, hex_str)
78         return property(getter, setter)
79
80     md5sum = _make_hex_prop('md5sum', 32)
81     perm_sig = _make_hex_prop('perm_sig', 40)
82
83     @property
84     def perm_expiry(self):
85         return self._perm_expiry
86
87     @perm_expiry.setter
88     def perm_expiry(self, value):
89         if not arvados.util.is_hex(value, 1, 8):
90             raise ValueError(
91                 "permission timestamp must be a hex Unix timestamp: {}".
92                 format(value))
93         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
94
95     def permission_hint(self):
96         data = [self.perm_sig, self.perm_expiry]
97         if None in data:
98             return None
99         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
100         return "A{}@{:08x}".format(*data)
101
102     def parse_permission_hint(self, s):
103         try:
104             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
105         except IndexError:
106             raise ValueError("bad permission hint {}".format(s))
107
108     def permission_expired(self, as_of_dt=None):
109         if self.perm_expiry is None:
110             return False
111         elif as_of_dt is None:
112             as_of_dt = datetime.datetime.now()
113         return self.perm_expiry <= as_of_dt
114
115
116 class Keep(object):
117     """Simple interface to a global KeepClient object.
118
119     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
120     own API client.  The global KeepClient will build an API client from the
121     current Arvados configuration, which may not match the one you built.
122     """
123     _last_key = None
124
125     @classmethod
126     def global_client_object(cls):
127         global global_client_object
128         # Previously, KeepClient would change its behavior at runtime based
129         # on these configuration settings.  We simulate that behavior here
130         # by checking the values and returning a new KeepClient if any of
131         # them have changed.
132         key = (config.get('ARVADOS_API_HOST'),
133                config.get('ARVADOS_API_TOKEN'),
134                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
135                config.get('ARVADOS_KEEP_PROXY'),
136                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
137                os.environ.get('KEEP_LOCAL_STORE'))
138         if (global_client_object is None) or (cls._last_key != key):
139             global_client_object = KeepClient()
140             cls._last_key = key
141         return global_client_object
142
143     @staticmethod
144     def get(locator, **kwargs):
145         return Keep.global_client_object().get(locator, **kwargs)
146
147     @staticmethod
148     def put(data, **kwargs):
149         return Keep.global_client_object().put(data, **kwargs)
150
151 class KeepBlockCache(object):
152     # Default RAM cache is 256MiB
153     def __init__(self, cache_max=(256 * 1024 * 1024)):
154         self.cache_max = cache_max
155         self._cache = []
156         self._cache_lock = threading.Lock()
157
158     class CacheSlot(object):
159         def __init__(self, locator):
160             self.locator = locator
161             self.ready = threading.Event()
162             self.content = None
163
164         def get(self):
165             self.ready.wait()
166             return self.content
167
168         def set(self, value):
169             self.content = value
170             self.ready.set()
171
172         def size(self):
173             if self.content is None:
174                 return 0
175             else:
176                 return len(self.content)
177
178     def cap_cache(self):
179         '''Cap the cache size to self.cache_max'''
180         with self._cache_lock:
181             # Select all slots except those where ready.is_set() and content is
182             # None (that means there was an error reading the block).
183             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
184             sm = sum([slot.size() for slot in self._cache])
185             while len(self._cache) > 0 and sm > self.cache_max:
186                 for i in xrange(len(self._cache)-1, -1, -1):
187                     if self._cache[i].ready.is_set():
188                         del self._cache[i]
189                         break
190                 sm = sum([slot.size() for slot in self._cache])
191
192     def _get(self, locator):
193         # Test if the locator is already in the cache
194         for i in xrange(0, len(self._cache)):
195             if self._cache[i].locator == locator:
196                 n = self._cache[i]
197                 if i != 0:
198                     # move it to the front
199                     del self._cache[i]
200                     self._cache.insert(0, n)
201                 return n
202         return None
203
204     def get(self, locator):
205         with self._cache_lock:
206             return self._get(locator)
207
208     def reserve_cache(self, locator):
209         '''Reserve a cache slot for the specified locator,
210         or return the existing slot.'''
211         with self._cache_lock:
212             n = self._get(locator)
213             if n:
214                 return n, False
215             else:
216                 # Add a new cache slot for the locator
217                 n = KeepBlockCache.CacheSlot(locator)
218                 self._cache.insert(0, n)
219                 return n, True
220
221 class KeepClient(object):
222
223     # Default Keep server connection timeout:  2 seconds
224     # Default Keep server read timeout:      300 seconds
225     # Default Keep proxy connection timeout:  20 seconds
226     # Default Keep proxy read timeout:       300 seconds
227     DEFAULT_TIMEOUT = (2, 300)
228     DEFAULT_PROXY_TIMEOUT = (20, 300)
229
230     class ThreadLimiter(object):
231         """
232         Limit the number of threads running at a given time to
233         {desired successes} minus {successes reported}. When successes
234         reported == desired, wake up the remaining threads and tell
235         them to quit.
236
237         Should be used in a "with" block.
238         """
239         def __init__(self, todo):
240             self._todo = todo
241             self._done = 0
242             self._response = None
243             self._todo_lock = threading.Semaphore(todo)
244             self._done_lock = threading.Lock()
245
246         def __enter__(self):
247             self._todo_lock.acquire()
248             return self
249
250         def __exit__(self, type, value, traceback):
251             self._todo_lock.release()
252
253         def shall_i_proceed(self):
254             """
255             Return true if the current thread should do stuff. Return
256             false if the current thread should just stop.
257             """
258             with self._done_lock:
259                 return (self._done < self._todo)
260
261         def save_response(self, response_body, replicas_stored):
262             """
263             Records a response body (a locator, possibly signed) returned by
264             the Keep server.  It is not necessary to save more than
265             one response, since we presume that any locator returned
266             in response to a successful request is valid.
267             """
268             with self._done_lock:
269                 self._done += replicas_stored
270                 self._response = response_body
271
272         def response(self):
273             """
274             Returns the body from the response to a PUT request.
275             """
276             with self._done_lock:
277                 return self._response
278
279         def done(self):
280             """
281             Return how many successes were reported.
282             """
283             with self._done_lock:
284                 return self._done
285
286
287     class KeepService(object):
288         # Make requests to a single Keep service, and track results.
289         HTTP_ERRORS = (requests.exceptions.RequestException,
290                        socket.error, ssl.SSLError)
291
292         def __init__(self, root, session, **headers):
293             self.root = root
294             self.last_result = None
295             self.success_flag = None
296             self.session = session
297             self.get_headers = {'Accept': 'application/octet-stream'}
298             self.get_headers.update(headers)
299             self.put_headers = headers
300
301         def usable(self):
302             return self.success_flag is not False
303
304         def finished(self):
305             return self.success_flag is not None
306
307         def last_status(self):
308             try:
309                 return self.last_result.status_code
310             except AttributeError:
311                 return None
312
313         def get(self, locator, timeout=None):
314             # locator is a KeepLocator object.
315             url = self.root + str(locator)
316             _logger.debug("Request: GET %s", url)
317             try:
318                 with timer.Timer() as t:
319                     result = self.session.get(url.encode('utf-8'),
320                                           headers=self.get_headers,
321                                           timeout=timeout)
322             except self.HTTP_ERRORS as e:
323                 _logger.debug("Request fail: GET %s => %s: %s",
324                               url, type(e), str(e))
325                 self.last_result = e
326             else:
327                 self.last_result = result
328                 self.success_flag = retry.check_http_response_success(result)
329                 content = result.content
330                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
331                              self.last_status(), len(content), t.msecs,
332                              (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
333                 if self.success_flag:
334                     resp_md5 = hashlib.md5(content).hexdigest()
335                     if resp_md5 == locator.md5sum:
336                         return content
337                     _logger.warning("Checksum fail: md5(%s) = %s",
338                                     url, resp_md5)
339             return None
340
341         def put(self, hash_s, body, timeout=None):
342             url = self.root + hash_s
343             _logger.debug("Request: PUT %s", url)
344             try:
345                 result = self.session.put(url.encode('utf-8'),
346                                       data=body,
347                                       headers=self.put_headers,
348                                       timeout=timeout)
349             except self.HTTP_ERRORS as e:
350                 _logger.debug("Request fail: PUT %s => %s: %s",
351                               url, type(e), str(e))
352                 self.last_result = e
353             else:
354                 self.last_result = result
355                 self.success_flag = retry.check_http_response_success(result)
356             return self.success_flag
357
358
359     class KeepWriterThread(threading.Thread):
360         """
361         Write a blob of data to the given Keep server. On success, call
362         save_response() of the given ThreadLimiter to save the returned
363         locator.
364         """
365         def __init__(self, keep_service, **kwargs):
366             super(KeepClient.KeepWriterThread, self).__init__()
367             self.service = keep_service
368             self.args = kwargs
369             self._success = False
370
371         def success(self):
372             return self._success
373
374         def run(self):
375             with self.args['thread_limiter'] as limiter:
376                 if not limiter.shall_i_proceed():
377                     # My turn arrived, but the job has been done without
378                     # me.
379                     return
380                 self.run_with_limiter(limiter)
381
382         def run_with_limiter(self, limiter):
383             if self.service.finished():
384                 return
385             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
386                           str(threading.current_thread()),
387                           self.args['data_hash'],
388                           len(self.args['data']),
389                           self.args['service_root'])
390             self._success = bool(self.service.put(
391                 self.args['data_hash'],
392                 self.args['data'],
393                 timeout=self.args.get('timeout', None)))
394             status = self.service.last_status()
395             if self._success:
396                 result = self.service.last_result
397                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
398                               str(threading.current_thread()),
399                               self.args['data_hash'],
400                               len(self.args['data']),
401                               self.args['service_root'])
402                 # Tick the 'done' counter for the number of replica
403                 # reported stored by the server, for the case that
404                 # we're talking to a proxy or other backend that
405                 # stores to multiple copies for us.
406                 try:
407                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
408                 except (KeyError, ValueError):
409                     replicas_stored = 1
410                 limiter.save_response(result.content.strip(), replicas_stored)
411             elif status is not None:
412                 _logger.debug("Request fail: PUT %s => %s %s",
413                               self.args['data_hash'], status,
414                               self.service.last_result.content)
415
416
417     def __init__(self, api_client=None, proxy=None,
418                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
419                  api_token=None, local_store=None, block_cache=None,
420                  num_retries=0, session=None):
421         """Initialize a new KeepClient.
422
423         Arguments:
424         :api_client:
425           The API client to use to find Keep services.  If not
426           provided, KeepClient will build one from available Arvados
427           configuration.
428
429         :proxy:
430           If specified, this KeepClient will send requests to this Keep
431           proxy.  Otherwise, KeepClient will fall back to the setting of the
432           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
433           KeepClient does not use a proxy, pass in an empty string.
434
435         :timeout:
436           The timeout (in seconds) for HTTP requests to Keep
437           non-proxy servers.  A tuple of two floats is interpreted as
438           (connection_timeout, read_timeout): see
439           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
440           Default: (2, 300).
441
442         :proxy_timeout:
443           The timeout (in seconds) for HTTP requests to
444           Keep proxies. A tuple of two floats is interpreted as
445           (connection_timeout, read_timeout). Default: (20, 300).
446
447         :api_token:
448           If you're not using an API client, but only talking
449           directly to a Keep proxy, this parameter specifies an API token
450           to authenticate Keep requests.  It is an error to specify both
451           api_client and api_token.  If you specify neither, KeepClient
452           will use one available from the Arvados configuration.
453
454         :local_store:
455           If specified, this KeepClient will bypass Keep
456           services, and save data to the named directory.  If unspecified,
457           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
458           environment variable.  If you want to ensure KeepClient does not
459           use local storage, pass in an empty string.  This is primarily
460           intended to mock a server for testing.
461
462         :num_retries:
463           The default number of times to retry failed requests.
464           This will be used as the default num_retries value when get() and
465           put() are called.  Default 0.
466
467         :session:
468           The requests.Session object to use for get() and put() requests.
469           Will create one if not specified.
470         """
471         self.lock = threading.Lock()
472         if proxy is None:
473             proxy = config.get('ARVADOS_KEEP_PROXY')
474         if api_token is None:
475             if api_client is None:
476                 api_token = config.get('ARVADOS_API_TOKEN')
477             else:
478                 api_token = api_client.api_token
479         elif api_client is not None:
480             raise ValueError(
481                 "can't build KeepClient with both API client and token")
482         if local_store is None:
483             local_store = os.environ.get('KEEP_LOCAL_STORE')
484
485         self.block_cache = block_cache if block_cache else KeepBlockCache()
486         self.timeout = timeout
487         self.proxy_timeout = proxy_timeout
488
489         if local_store:
490             self.local_store = local_store
491             self.get = self.local_store_get
492             self.put = self.local_store_put
493         else:
494             self.num_retries = num_retries
495             self.session = session if session is not None else requests.Session()
496             if proxy:
497                 if not proxy.endswith('/'):
498                     proxy += '/'
499                 self.api_token = api_token
500                 self._keep_services = [{
501                     'uuid': 'proxy',
502                     '_service_root': proxy,
503                     }]
504                 self.using_proxy = True
505                 self._static_services_list = True
506             else:
507                 # It's important to avoid instantiating an API client
508                 # unless we actually need one, for testing's sake.
509                 if api_client is None:
510                     api_client = arvados.api('v1')
511                 self.api_client = api_client
512                 self.api_token = api_client.api_token
513                 self._keep_services = None
514                 self.using_proxy = None
515                 self._static_services_list = False
516
517     def current_timeout(self):
518         """Return the appropriate timeout to use for this client: the proxy
519         timeout setting if the backend service is currently a proxy,
520         the regular timeout setting otherwise.
521         """
522         # TODO(twp): the timeout should be a property of a
523         # KeepService, not a KeepClient. See #4488.
524         return self.proxy_timeout if self.using_proxy else self.timeout
525
526     def build_services_list(self, force_rebuild=False):
527         if (self._static_services_list or
528               (self._keep_services and not force_rebuild)):
529             return
530         with self.lock:
531             try:
532                 keep_services = self.api_client.keep_services().accessible()
533             except Exception:  # API server predates Keep services.
534                 keep_services = self.api_client.keep_disks().list()
535
536             self._keep_services = keep_services.execute().get('items')
537             if not self._keep_services:
538                 raise arvados.errors.NoKeepServersError()
539
540             self.using_proxy = any(ks.get('service_type') == 'proxy'
541                                    for ks in self._keep_services)
542
543             # Precompute the base URI for each service.
544             for r in self._keep_services:
545                 r['_service_root'] = "{}://[{}]:{:d}/".format(
546                     'https' if r['service_ssl_flag'] else 'http',
547                     r['service_host'],
548                     r['service_port'])
549             _logger.debug(str(self._keep_services))
550
551     def _service_weight(self, data_hash, service_uuid):
552         """Compute the weight of a Keep service endpoint for a data
553         block with a known hash.
554
555         The weight is md5(h + u) where u is the last 15 characters of
556         the service endpoint's UUID.
557         """
558         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
559
560     def weighted_service_roots(self, data_hash, force_rebuild=False):
561         """Return an array of Keep service endpoints, in the order in
562         which they should be probed when reading or writing data with
563         the given hash.
564         """
565         self.build_services_list(force_rebuild)
566
567         # Sort the available services by weight (heaviest first) for
568         # this data_hash, and return their service_roots (base URIs)
569         # in that order.
570         sorted_roots = [
571             svc['_service_root'] for svc in sorted(
572                 self._keep_services,
573                 reverse=True,
574                 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
575         _logger.debug(data_hash + ': ' + str(sorted_roots))
576         return sorted_roots
577
578     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
579         # roots_map is a dictionary, mapping Keep service root strings
580         # to KeepService objects.  Poll for Keep services, and add any
581         # new ones to roots_map.  Return the current list of local
582         # root strings.
583         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
584         local_roots = self.weighted_service_roots(md5_s, force_rebuild)
585         for root in local_roots:
586             if root not in roots_map:
587                 roots_map[root] = self.KeepService(root, self.session, **headers)
588         return local_roots
589
590     @staticmethod
591     def _check_loop_result(result):
592         # KeepClient RetryLoops should save results as a 2-tuple: the
593         # actual result of the request, and the number of servers available
594         # to receive the request this round.
595         # This method returns True if there's a real result, False if
596         # there are no more servers available, otherwise None.
597         if isinstance(result, Exception):
598             return None
599         result, tried_server_count = result
600         if (result is not None) and (result is not False):
601             return True
602         elif tried_server_count < 1:
603             _logger.info("No more Keep services to try; giving up")
604             return False
605         else:
606             return None
607
608     def get_from_cache(self, loc):
609         """Fetch a block only if is in the cache, otherwise return None."""
610         slot = self.block_cache.get(loc)
611         if slot.ready.is_set():
612             return slot.get()
613         else:
614             return None
615
616     @retry.retry_method
617     def get(self, loc_s, num_retries=None):
618         """Get data from Keep.
619
620         This method fetches one or more blocks of data from Keep.  It
621         sends a request each Keep service registered with the API
622         server (or the proxy provided when this client was
623         instantiated), then each service named in location hints, in
624         sequence.  As soon as one service provides the data, it's
625         returned.
626
627         Arguments:
628         * loc_s: A string of one or more comma-separated locators to fetch.
629           This method returns the concatenation of these blocks.
630         * num_retries: The number of times to retry GET requests to
631           *each* Keep server if it returns temporary failures, with
632           exponential backoff.  Note that, in each loop, the method may try
633           to fetch data from every available Keep service, along with any
634           that are named in location hints in the locator.  The default value
635           is set when the KeepClient is initialized.
636         """
637         if ',' in loc_s:
638             return ''.join(self.get(x) for x in loc_s.split(','))
639         locator = KeepLocator(loc_s)
640         expect_hash = locator.md5sum
641         slot, first = self.block_cache.reserve_cache(expect_hash)
642         if not first:
643             v = slot.get()
644             return v
645
646         # See #3147 for a discussion of the loop implementation.  Highlights:
647         # * Refresh the list of Keep services after each failure, in case
648         #   it's being updated.
649         # * Retry until we succeed, we're out of retries, or every available
650         #   service has returned permanent failure.
651         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
652                       for hint in locator.hints if hint.startswith('K@')]
653         # Map root URLs their KeepService objects.
654         roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
655         blob = None
656         loop = retry.RetryLoop(num_retries, self._check_loop_result,
657                                backoff_start=2)
658         for tries_left in loop:
659             try:
660                 local_roots = self.map_new_services(
661                     roots_map, expect_hash,
662                     force_rebuild=(tries_left < num_retries))
663             except Exception as error:
664                 loop.save_result(error)
665                 continue
666
667             # Query KeepService objects that haven't returned
668             # permanent failure, in our specified shuffle order.
669             services_to_try = [roots_map[root]
670                                for root in (local_roots + hint_roots)
671                                if roots_map[root].usable()]
672             for keep_service in services_to_try:
673                 blob = keep_service.get(locator, timeout=self.current_timeout())
674                 if blob is not None:
675                     break
676             loop.save_result((blob, len(services_to_try)))
677
678         # Always cache the result, then return it if we succeeded.
679         slot.set(blob)
680         self.block_cache.cap_cache()
681         if loop.success():
682             return blob
683
684         try:
685             all_roots = local_roots + hint_roots
686         except NameError:
687             # We never successfully fetched local_roots.
688             all_roots = hint_roots
689         # Q: Including 403 is necessary for the Keep tests to continue
690         # passing, but maybe they should expect KeepReadError instead?
691         not_founds = sum(1 for key in all_roots
692                          if roots_map[key].last_status() in {403, 404, 410})
693         service_errors = ((key, roots_map[key].last_result)
694                           for key in all_roots)
695         if not roots_map:
696             raise arvados.errors.KeepReadError(
697                 "failed to read {}: no Keep services available ({})".format(
698                     loc_s, loop.last_result()))
699         elif not_founds == len(all_roots):
700             raise arvados.errors.NotFoundError(
701                 "{} not found".format(loc_s), service_errors)
702         else:
703             raise arvados.errors.KeepReadError(
704                 "failed to read {}".format(loc_s), service_errors, label="service")
705
706     @retry.retry_method
707     def put(self, data, copies=2, num_retries=None):
708         """Save data in Keep.
709
710         This method will get a list of Keep services from the API server, and
711         send the data to each one simultaneously in a new thread.  Once the
712         uploads are finished, if enough copies are saved, this method returns
713         the most recent HTTP response body.  If requests fail to upload
714         enough copies, this method raises KeepWriteError.
715
716         Arguments:
717         * data: The string of data to upload.
718         * copies: The number of copies that the user requires be saved.
719           Default 2.
720         * num_retries: The number of times to retry PUT requests to
721           *each* Keep server if it returns temporary failures, with
722           exponential backoff.  The default value is set when the
723           KeepClient is initialized.
724         """
725
726         if isinstance(data, unicode):
727             data = data.encode("ascii")
728         elif not isinstance(data, str):
729             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
730
731         data_hash = hashlib.md5(data).hexdigest()
732         if copies < 1:
733             return data_hash
734
735         headers = {}
736         if self.using_proxy:
737             # Tell the proxy how many copies we want it to store
738             headers['X-Keep-Desired-Replication'] = str(copies)
739         roots_map = {}
740         thread_limiter = KeepClient.ThreadLimiter(copies)
741         loop = retry.RetryLoop(num_retries, self._check_loop_result,
742                                backoff_start=2)
743         for tries_left in loop:
744             try:
745                 local_roots = self.map_new_services(
746                     roots_map, data_hash,
747                     force_rebuild=(tries_left < num_retries), **headers)
748             except Exception as error:
749                 loop.save_result(error)
750                 continue
751
752             threads = []
753             for service_root, ks in roots_map.iteritems():
754                 if ks.finished():
755                     continue
756                 t = KeepClient.KeepWriterThread(
757                     ks,
758                     data=data,
759                     data_hash=data_hash,
760                     service_root=service_root,
761                     thread_limiter=thread_limiter,
762                     timeout=self.current_timeout())
763                 t.start()
764                 threads.append(t)
765             for t in threads:
766                 t.join()
767             loop.save_result((thread_limiter.done() >= copies, len(threads)))
768
769         if loop.success():
770             return thread_limiter.response()
771         if not roots_map:
772             raise arvados.errors.KeepWriteError(
773                 "failed to write {}: no Keep services available ({})".format(
774                     data_hash, loop.last_result()))
775         else:
776             service_errors = ((key, roots_map[key].last_result)
777                               for key in local_roots
778                               if not roots_map[key].success_flag)
779             raise arvados.errors.KeepWriteError(
780                 "failed to write {} (wanted {} copies but wrote {})".format(
781                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
782
783     def local_store_put(self, data, copies=1, num_retries=None):
784         """A stub for put().
785
786         This method is used in place of the real put() method when
787         using local storage (see constructor's local_store argument).
788
789         copies and num_retries arguments are ignored: they are here
790         only for the sake of offering the same call signature as
791         put().
792
793         Data stored this way can be retrieved via local_store_get().
794         """
795         md5 = hashlib.md5(data).hexdigest()
796         locator = '%s+%d' % (md5, len(data))
797         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
798             f.write(data)
799         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
800                   os.path.join(self.local_store, md5))
801         return locator
802
803     def local_store_get(self, loc_s, num_retries=None):
804         """Companion to local_store_put()."""
805         try:
806             locator = KeepLocator(loc_s)
807         except ValueError:
808             raise arvados.errors.NotFoundError(
809                 "Invalid data locator: '%s'" % loc_s)
810         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
811             return ''
812         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
813             return f.read()
814
815     def is_cached(self, locator):
816         return self.block_cache.reserve_cache(expect_hash)