4823: Add flush() to ArvadosFile. Fix tests to avoid using internal APIs. Fix
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import logging
3 import os
4 import pprint
5 import sys
6 import types
7 import subprocess
8 import json
9 import UserDict
10 import re
11 import hashlib
12 import string
13 import bz2
14 import zlib
15 import fcntl
16 import time
17 import threading
18 import timer
19 import datetime
20 import ssl
21 import socket
22 import requests
23
24 import arvados
25 import arvados.config as config
26 import arvados.errors
27 import arvados.retry as retry
28 import arvados.util
29
30 _logger = logging.getLogger('arvados.keep')
31 global_client_object = None
32
33 class KeepLocator(object):
34     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
36
37     def __init__(self, locator_str):
38         self.hints = []
39         self._perm_sig = None
40         self._perm_expiry = None
41         pieces = iter(locator_str.split('+'))
42         self.md5sum = next(pieces)
43         try:
44             self.size = int(next(pieces))
45         except StopIteration:
46             self.size = None
47         for hint in pieces:
48             if self.HINT_RE.match(hint) is None:
49                 raise ValueError("unrecognized hint data {}".format(hint))
50             elif hint.startswith('A'):
51                 self.parse_permission_hint(hint)
52             else:
53                 self.hints.append(hint)
54
55     def __str__(self):
56         return '+'.join(
57             str(s) for s in [self.md5sum, self.size,
58                              self.permission_hint()] + self.hints
59             if s is not None)
60
61     def stripped(self):
62         return "%s+%i" % (self.md5sum, self.size)
63
64     def _make_hex_prop(name, length):
65         # Build and return a new property with the given name that
66         # must be a hex string of the given length.
67         data_name = '_{}'.format(name)
68         def getter(self):
69             return getattr(self, data_name)
70         def setter(self, hex_str):
71             if not arvados.util.is_hex(hex_str, length):
72                 raise ValueError("{} must be a {}-digit hex string: {}".
73                                  format(name, length, hex_str))
74             setattr(self, data_name, hex_str)
75         return property(getter, setter)
76
77     md5sum = _make_hex_prop('md5sum', 32)
78     perm_sig = _make_hex_prop('perm_sig', 40)
79
80     @property
81     def perm_expiry(self):
82         return self._perm_expiry
83
84     @perm_expiry.setter
85     def perm_expiry(self, value):
86         if not arvados.util.is_hex(value, 1, 8):
87             raise ValueError(
88                 "permission timestamp must be a hex Unix timestamp: {}".
89                 format(value))
90         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
91
92     def permission_hint(self):
93         data = [self.perm_sig, self.perm_expiry]
94         if None in data:
95             return None
96         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
97         return "A{}@{:08x}".format(*data)
98
99     def parse_permission_hint(self, s):
100         try:
101             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
102         except IndexError:
103             raise ValueError("bad permission hint {}".format(s))
104
105     def permission_expired(self, as_of_dt=None):
106         if self.perm_expiry is None:
107             return False
108         elif as_of_dt is None:
109             as_of_dt = datetime.datetime.now()
110         return self.perm_expiry <= as_of_dt
111
112
113 class Keep(object):
114     """Simple interface to a global KeepClient object.
115
116     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
117     own API client.  The global KeepClient will build an API client from the
118     current Arvados configuration, which may not match the one you built.
119     """
120     _last_key = None
121
122     @classmethod
123     def global_client_object(cls):
124         global global_client_object
125         # Previously, KeepClient would change its behavior at runtime based
126         # on these configuration settings.  We simulate that behavior here
127         # by checking the values and returning a new KeepClient if any of
128         # them have changed.
129         key = (config.get('ARVADOS_API_HOST'),
130                config.get('ARVADOS_API_TOKEN'),
131                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
132                config.get('ARVADOS_KEEP_PROXY'),
133                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
134                os.environ.get('KEEP_LOCAL_STORE'))
135         if (global_client_object is None) or (cls._last_key != key):
136             global_client_object = KeepClient()
137             cls._last_key = key
138         return global_client_object
139
140     @staticmethod
141     def get(locator, **kwargs):
142         return Keep.global_client_object().get(locator, **kwargs)
143
144     @staticmethod
145     def put(data, **kwargs):
146         return Keep.global_client_object().put(data, **kwargs)
147
148 class KeepBlockCache(object):
149     # Default RAM cache is 256MiB
150     def __init__(self, cache_max=(256 * 1024 * 1024)):
151         self.cache_max = cache_max
152         self._cache = []
153         self._cache_lock = threading.Lock()
154
155     class CacheSlot(object):
156         def __init__(self, locator):
157             self.locator = locator
158             self.ready = threading.Event()
159             self.content = None
160
161         def get(self):
162             self.ready.wait()
163             return self.content
164
165         def set(self, value):
166             self.content = value
167             self.ready.set()
168
169         def size(self):
170             if self.content is None:
171                 return 0
172             else:
173                 return len(self.content)
174
175     def cap_cache(self):
176         '''Cap the cache size to self.cache_max'''
177         with self._cache_lock:
178             # Select all slots except those where ready.is_set() and content is
179             # None (that means there was an error reading the block).
180             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
181             sm = sum([slot.size() for slot in self._cache])
182             while len(self._cache) > 0 and sm > self.cache_max:
183                 for i in xrange(len(self._cache)-1, -1, -1):
184                     if self._cache[i].ready.is_set():
185                         del self._cache[i]
186                         break
187                 sm = sum([slot.size() for slot in self._cache])
188
189     def _get(self, locator):
190         # Test if the locator is already in the cache
191         for i in xrange(0, len(self._cache)):
192             if self._cache[i].locator == locator:
193                 n = self._cache[i]
194                 if i != 0:
195                     # move it to the front
196                     del self._cache[i]
197                     self._cache.insert(0, n)
198                 return n
199         return None
200
201     def get(self, locator):
202         with self._cache_lock:
203             return self._get(locator)
204
205     def reserve_cache(self, locator):
206         '''Reserve a cache slot for the specified locator,
207         or return the existing slot.'''
208         with self._cache_lock:
209             n = self._get(locator)
210             if n:
211                 return n, False
212             else:
213                 # Add a new cache slot for the locator
214                 n = KeepBlockCache.CacheSlot(locator)
215                 self._cache.insert(0, n)
216                 return n, True
217
218 class KeepClient(object):
219
220     # Default Keep server connection timeout:  2 seconds
221     # Default Keep server read timeout:      300 seconds
222     # Default Keep proxy connection timeout:  20 seconds
223     # Default Keep proxy read timeout:       300 seconds
224     DEFAULT_TIMEOUT = (2, 300)
225     DEFAULT_PROXY_TIMEOUT = (20, 300)
226
227     class ThreadLimiter(object):
228         """
229         Limit the number of threads running at a given time to
230         {desired successes} minus {successes reported}. When successes
231         reported == desired, wake up the remaining threads and tell
232         them to quit.
233
234         Should be used in a "with" block.
235         """
236         def __init__(self, todo):
237             self._todo = todo
238             self._done = 0
239             self._response = None
240             self._todo_lock = threading.Semaphore(todo)
241             self._done_lock = threading.Lock()
242
243         def __enter__(self):
244             self._todo_lock.acquire()
245             return self
246
247         def __exit__(self, type, value, traceback):
248             self._todo_lock.release()
249
250         def shall_i_proceed(self):
251             """
252             Return true if the current thread should do stuff. Return
253             false if the current thread should just stop.
254             """
255             with self._done_lock:
256                 return (self._done < self._todo)
257
258         def save_response(self, response_body, replicas_stored):
259             """
260             Records a response body (a locator, possibly signed) returned by
261             the Keep server.  It is not necessary to save more than
262             one response, since we presume that any locator returned
263             in response to a successful request is valid.
264             """
265             with self._done_lock:
266                 self._done += replicas_stored
267                 self._response = response_body
268
269         def response(self):
270             """
271             Returns the body from the response to a PUT request.
272             """
273             with self._done_lock:
274                 return self._response
275
276         def done(self):
277             """
278             Return how many successes were reported.
279             """
280             with self._done_lock:
281                 return self._done
282
283
284     class KeepService(object):
285         # Make requests to a single Keep service, and track results.
286         HTTP_ERRORS = (requests.exceptions.RequestException,
287                        socket.error, ssl.SSLError)
288
289         def __init__(self, root, session, **headers):
290             self.root = root
291             self.last_result = None
292             self.success_flag = None
293             self.session = session
294             self.get_headers = {'Accept': 'application/octet-stream'}
295             self.get_headers.update(headers)
296             self.put_headers = headers
297
298         def usable(self):
299             return self.success_flag is not False
300
301         def finished(self):
302             return self.success_flag is not None
303
304         def last_status(self):
305             try:
306                 return self.last_result.status_code
307             except AttributeError:
308                 return None
309
310         def get(self, locator, timeout=None):
311             # locator is a KeepLocator object.
312             url = self.root + str(locator)
313             _logger.debug("Request: GET %s", url)
314             try:
315                 with timer.Timer() as t:
316                     result = self.session.get(url.encode('utf-8'),
317                                           headers=self.get_headers,
318                                           timeout=timeout)
319             except self.HTTP_ERRORS as e:
320                 _logger.debug("Request fail: GET %s => %s: %s",
321                               url, type(e), str(e))
322                 self.last_result = e
323             else:
324                 self.last_result = result
325                 self.success_flag = retry.check_http_response_success(result)
326                 content = result.content
327                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
328                              self.last_status(), len(content), t.msecs,
329                              (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
330                 if self.success_flag:
331                     resp_md5 = hashlib.md5(content).hexdigest()
332                     if resp_md5 == locator.md5sum:
333                         return content
334                     _logger.warning("Checksum fail: md5(%s) = %s",
335                                     url, resp_md5)
336             return None
337
338         def put(self, hash_s, body, timeout=None):
339             url = self.root + hash_s
340             _logger.debug("Request: PUT %s", url)
341             try:
342                 result = self.session.put(url.encode('utf-8'),
343                                       data=body,
344                                       headers=self.put_headers,
345                                       timeout=timeout)
346             except self.HTTP_ERRORS as e:
347                 _logger.debug("Request fail: PUT %s => %s: %s",
348                               url, type(e), str(e))
349                 self.last_result = e
350             else:
351                 self.last_result = result
352                 self.success_flag = retry.check_http_response_success(result)
353             return self.success_flag
354
355
356     class KeepWriterThread(threading.Thread):
357         """
358         Write a blob of data to the given Keep server. On success, call
359         save_response() of the given ThreadLimiter to save the returned
360         locator.
361         """
362         def __init__(self, keep_service, **kwargs):
363             super(KeepClient.KeepWriterThread, self).__init__()
364             self.service = keep_service
365             self.args = kwargs
366             self._success = False
367
368         def success(self):
369             return self._success
370
371         def run(self):
372             with self.args['thread_limiter'] as limiter:
373                 if not limiter.shall_i_proceed():
374                     # My turn arrived, but the job has been done without
375                     # me.
376                     return
377                 self.run_with_limiter(limiter)
378
379         def run_with_limiter(self, limiter):
380             if self.service.finished():
381                 return
382             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
383                           str(threading.current_thread()),
384                           self.args['data_hash'],
385                           len(self.args['data']),
386                           self.args['service_root'])
387             self._success = bool(self.service.put(
388                 self.args['data_hash'],
389                 self.args['data'],
390                 timeout=self.args.get('timeout', None)))
391             status = self.service.last_status()
392             if self._success:
393                 result = self.service.last_result
394                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
395                               str(threading.current_thread()),
396                               self.args['data_hash'],
397                               len(self.args['data']),
398                               self.args['service_root'])
399                 # Tick the 'done' counter for the number of replica
400                 # reported stored by the server, for the case that
401                 # we're talking to a proxy or other backend that
402                 # stores to multiple copies for us.
403                 try:
404                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
405                 except (KeyError, ValueError):
406                     replicas_stored = 1
407                 limiter.save_response(result.content.strip(), replicas_stored)
408             elif status is not None:
409                 _logger.debug("Request fail: PUT %s => %s %s",
410                               self.args['data_hash'], status,
411                               self.service.last_result.content)
412
413
414     def __init__(self, api_client=None, proxy=None,
415                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
416                  api_token=None, local_store=None, block_cache=None,
417                  num_retries=0, session=None):
418         """Initialize a new KeepClient.
419
420         Arguments:
421         :api_client:
422           The API client to use to find Keep services.  If not
423           provided, KeepClient will build one from available Arvados
424           configuration.
425
426         :proxy:
427           If specified, this KeepClient will send requests to this Keep
428           proxy.  Otherwise, KeepClient will fall back to the setting of the
429           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
430           KeepClient does not use a proxy, pass in an empty string.
431
432         :timeout:
433           The timeout (in seconds) for HTTP requests to Keep
434           non-proxy servers.  A tuple of two floats is interpreted as
435           (connection_timeout, read_timeout): see
436           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
437           Default: (2, 300).
438
439         :proxy_timeout:
440           The timeout (in seconds) for HTTP requests to
441           Keep proxies. A tuple of two floats is interpreted as
442           (connection_timeout, read_timeout). Default: (20, 300).
443
444         :api_token:
445           If you're not using an API client, but only talking
446           directly to a Keep proxy, this parameter specifies an API token
447           to authenticate Keep requests.  It is an error to specify both
448           api_client and api_token.  If you specify neither, KeepClient
449           will use one available from the Arvados configuration.
450
451         :local_store:
452           If specified, this KeepClient will bypass Keep
453           services, and save data to the named directory.  If unspecified,
454           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
455           environment variable.  If you want to ensure KeepClient does not
456           use local storage, pass in an empty string.  This is primarily
457           intended to mock a server for testing.
458
459         :num_retries:
460           The default number of times to retry failed requests.
461           This will be used as the default num_retries value when get() and
462           put() are called.  Default 0.
463
464         :session:
465           The requests.Session object to use for get() and put() requests.
466           Will create one if not specified.
467         """
468         self.lock = threading.Lock()
469         if proxy is None:
470             proxy = config.get('ARVADOS_KEEP_PROXY')
471         if api_token is None:
472             if api_client is None:
473                 api_token = config.get('ARVADOS_API_TOKEN')
474             else:
475                 api_token = api_client.api_token
476         elif api_client is not None:
477             raise ValueError(
478                 "can't build KeepClient with both API client and token")
479         if local_store is None:
480             local_store = os.environ.get('KEEP_LOCAL_STORE')
481
482         self.block_cache = block_cache if block_cache else KeepBlockCache()
483         self.timeout = timeout
484         self.proxy_timeout = proxy_timeout
485
486         if local_store:
487             self.local_store = local_store
488             self.get = self.local_store_get
489             self.put = self.local_store_put
490         else:
491             self.num_retries = num_retries
492             self.session = session if session is not None else requests.Session()
493             if proxy:
494                 if not proxy.endswith('/'):
495                     proxy += '/'
496                 self.api_token = api_token
497                 self._keep_services = [{
498                     'uuid': 'proxy',
499                     '_service_root': proxy,
500                     }]
501                 self.using_proxy = True
502                 self._static_services_list = True
503             else:
504                 # It's important to avoid instantiating an API client
505                 # unless we actually need one, for testing's sake.
506                 if api_client is None:
507                     api_client = arvados.api('v1')
508                 self.api_client = api_client
509                 self.api_token = api_client.api_token
510                 self._keep_services = None
511                 self.using_proxy = None
512                 self._static_services_list = False
513
514     def current_timeout(self):
515         """Return the appropriate timeout to use for this client: the proxy
516         timeout setting if the backend service is currently a proxy,
517         the regular timeout setting otherwise.
518         """
519         # TODO(twp): the timeout should be a property of a
520         # KeepService, not a KeepClient. See #4488.
521         return self.proxy_timeout if self.using_proxy else self.timeout
522
523     def build_services_list(self, force_rebuild=False):
524         if (self._static_services_list or
525               (self._keep_services and not force_rebuild)):
526             return
527         with self.lock:
528             try:
529                 keep_services = self.api_client.keep_services().accessible()
530             except Exception:  # API server predates Keep services.
531                 keep_services = self.api_client.keep_disks().list()
532
533             self._keep_services = keep_services.execute().get('items')
534             if not self._keep_services:
535                 raise arvados.errors.NoKeepServersError()
536
537             self.using_proxy = any(ks.get('service_type') == 'proxy'
538                                    for ks in self._keep_services)
539
540             # Precompute the base URI for each service.
541             for r in self._keep_services:
542                 r['_service_root'] = "{}://[{}]:{:d}/".format(
543                     'https' if r['service_ssl_flag'] else 'http',
544                     r['service_host'],
545                     r['service_port'])
546             _logger.debug(str(self._keep_services))
547
548     def _service_weight(self, data_hash, service_uuid):
549         """Compute the weight of a Keep service endpoint for a data
550         block with a known hash.
551
552         The weight is md5(h + u) where u is the last 15 characters of
553         the service endpoint's UUID.
554         """
555         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
556
557     def weighted_service_roots(self, data_hash, force_rebuild=False):
558         """Return an array of Keep service endpoints, in the order in
559         which they should be probed when reading or writing data with
560         the given hash.
561         """
562         self.build_services_list(force_rebuild)
563
564         # Sort the available services by weight (heaviest first) for
565         # this data_hash, and return their service_roots (base URIs)
566         # in that order.
567         sorted_roots = [
568             svc['_service_root'] for svc in sorted(
569                 self._keep_services,
570                 reverse=True,
571                 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
572         _logger.debug(data_hash + ': ' + str(sorted_roots))
573         return sorted_roots
574
575     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
576         # roots_map is a dictionary, mapping Keep service root strings
577         # to KeepService objects.  Poll for Keep services, and add any
578         # new ones to roots_map.  Return the current list of local
579         # root strings.
580         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
581         local_roots = self.weighted_service_roots(md5_s, force_rebuild)
582         for root in local_roots:
583             if root not in roots_map:
584                 roots_map[root] = self.KeepService(root, self.session, **headers)
585         return local_roots
586
587     @staticmethod
588     def _check_loop_result(result):
589         # KeepClient RetryLoops should save results as a 2-tuple: the
590         # actual result of the request, and the number of servers available
591         # to receive the request this round.
592         # This method returns True if there's a real result, False if
593         # there are no more servers available, otherwise None.
594         if isinstance(result, Exception):
595             return None
596         result, tried_server_count = result
597         if (result is not None) and (result is not False):
598             return True
599         elif tried_server_count < 1:
600             _logger.info("No more Keep services to try; giving up")
601             return False
602         else:
603             return None
604
605     def get_from_cache(self, loc):
606         """Fetch a block only if is in the cache, otherwise return None."""
607         slot = self.block_cache.get(loc)
608         if slot.ready.is_set():
609             return slot.get()
610         else:
611             return None
612
613     @retry.retry_method
614     def get(self, loc_s, num_retries=None):
615         """Get data from Keep.
616
617         This method fetches one or more blocks of data from Keep.  It
618         sends a request each Keep service registered with the API
619         server (or the proxy provided when this client was
620         instantiated), then each service named in location hints, in
621         sequence.  As soon as one service provides the data, it's
622         returned.
623
624         Arguments:
625         * loc_s: A string of one or more comma-separated locators to fetch.
626           This method returns the concatenation of these blocks.
627         * num_retries: The number of times to retry GET requests to
628           *each* Keep server if it returns temporary failures, with
629           exponential backoff.  Note that, in each loop, the method may try
630           to fetch data from every available Keep service, along with any
631           that are named in location hints in the locator.  The default value
632           is set when the KeepClient is initialized.
633         """
634         if ',' in loc_s:
635             return ''.join(self.get(x) for x in loc_s.split(','))
636         locator = KeepLocator(loc_s)
637         expect_hash = locator.md5sum
638         slot, first = self.block_cache.reserve_cache(expect_hash)
639         if not first:
640             v = slot.get()
641             return v
642
643         # See #3147 for a discussion of the loop implementation.  Highlights:
644         # * Refresh the list of Keep services after each failure, in case
645         #   it's being updated.
646         # * Retry until we succeed, we're out of retries, or every available
647         #   service has returned permanent failure.
648         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
649                       for hint in locator.hints if hint.startswith('K@')]
650         # Map root URLs their KeepService objects.
651         roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
652         blob = None
653         loop = retry.RetryLoop(num_retries, self._check_loop_result,
654                                backoff_start=2)
655         for tries_left in loop:
656             try:
657                 local_roots = self.map_new_services(
658                     roots_map, expect_hash,
659                     force_rebuild=(tries_left < num_retries))
660             except Exception as error:
661                 loop.save_result(error)
662                 continue
663
664             # Query KeepService objects that haven't returned
665             # permanent failure, in our specified shuffle order.
666             services_to_try = [roots_map[root]
667                                for root in (local_roots + hint_roots)
668                                if roots_map[root].usable()]
669             for keep_service in services_to_try:
670                 blob = keep_service.get(locator, timeout=self.current_timeout())
671                 if blob is not None:
672                     break
673             loop.save_result((blob, len(services_to_try)))
674
675         # Always cache the result, then return it if we succeeded.
676         slot.set(blob)
677         self.block_cache.cap_cache()
678         if loop.success():
679             return blob
680
681         try:
682             all_roots = local_roots + hint_roots
683         except NameError:
684             # We never successfully fetched local_roots.
685             all_roots = hint_roots
686         # Q: Including 403 is necessary for the Keep tests to continue
687         # passing, but maybe they should expect KeepReadError instead?
688         not_founds = sum(1 for key in all_roots
689                          if roots_map[key].last_status() in {403, 404, 410})
690         service_errors = ((key, roots_map[key].last_result)
691                           for key in all_roots)
692         if not roots_map:
693             raise arvados.errors.KeepReadError(
694                 "failed to read {}: no Keep services available ({})".format(
695                     loc_s, loop.last_result()))
696         elif not_founds == len(all_roots):
697             raise arvados.errors.NotFoundError(
698                 "{} not found".format(loc_s), service_errors)
699         else:
700             raise arvados.errors.KeepReadError(
701                 "failed to read {}".format(loc_s), service_errors, label="service")
702
703     @retry.retry_method
704     def put(self, data, copies=2, num_retries=None):
705         """Save data in Keep.
706
707         This method will get a list of Keep services from the API server, and
708         send the data to each one simultaneously in a new thread.  Once the
709         uploads are finished, if enough copies are saved, this method returns
710         the most recent HTTP response body.  If requests fail to upload
711         enough copies, this method raises KeepWriteError.
712
713         Arguments:
714         * data: The string of data to upload.
715         * copies: The number of copies that the user requires be saved.
716           Default 2.
717         * num_retries: The number of times to retry PUT requests to
718           *each* Keep server if it returns temporary failures, with
719           exponential backoff.  The default value is set when the
720           KeepClient is initialized.
721         """
722
723         if isinstance(data, unicode):
724             data = data.encode("ascii")
725         elif not isinstance(data, str):
726             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
727
728         data_hash = hashlib.md5(data).hexdigest()
729         if copies < 1:
730             return data_hash
731
732         headers = {}
733         if self.using_proxy:
734             # Tell the proxy how many copies we want it to store
735             headers['X-Keep-Desired-Replication'] = str(copies)
736         roots_map = {}
737         thread_limiter = KeepClient.ThreadLimiter(copies)
738         loop = retry.RetryLoop(num_retries, self._check_loop_result,
739                                backoff_start=2)
740         for tries_left in loop:
741             try:
742                 local_roots = self.map_new_services(
743                     roots_map, data_hash,
744                     force_rebuild=(tries_left < num_retries), **headers)
745             except Exception as error:
746                 loop.save_result(error)
747                 continue
748
749             threads = []
750             for service_root, ks in roots_map.iteritems():
751                 if ks.finished():
752                     continue
753                 t = KeepClient.KeepWriterThread(
754                     ks,
755                     data=data,
756                     data_hash=data_hash,
757                     service_root=service_root,
758                     thread_limiter=thread_limiter,
759                     timeout=self.current_timeout())
760                 t.start()
761                 threads.append(t)
762             for t in threads:
763                 t.join()
764             loop.save_result((thread_limiter.done() >= copies, len(threads)))
765
766         if loop.success():
767             return thread_limiter.response()
768         if not roots_map:
769             raise arvados.errors.KeepWriteError(
770                 "failed to write {}: no Keep services available ({})".format(
771                     data_hash, loop.last_result()))
772         else:
773             service_errors = ((key, roots_map[key].last_result)
774                               for key in local_roots
775                               if not roots_map[key].success_flag)
776             raise arvados.errors.KeepWriteError(
777                 "failed to write {} (wanted {} copies but wrote {})".format(
778                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
779
780     def local_store_put(self, data, copies=1, num_retries=None):
781         """A stub for put().
782
783         This method is used in place of the real put() method when
784         using local storage (see constructor's local_store argument).
785
786         copies and num_retries arguments are ignored: they are here
787         only for the sake of offering the same call signature as
788         put().
789
790         Data stored this way can be retrieved via local_store_get().
791         """
792         md5 = hashlib.md5(data).hexdigest()
793         locator = '%s+%d' % (md5, len(data))
794         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
795             f.write(data)
796         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
797                   os.path.join(self.local_store, md5))
798         return locator
799
800     def local_store_get(self, loc_s, num_retries=None):
801         """Companion to local_store_put()."""
802         try:
803             locator = KeepLocator(loc_s)
804         except ValueError:
805             raise arvados.errors.NotFoundError(
806                 "Invalid data locator: '%s'" % loc_s)
807         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
808             return ''
809         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
810             return f.read()
811
812     def is_cached(self, locator):
813         return self.block_cache.reserve_cache(expect_hash)