5246: Add urllib3 workaround, along with a note why it is necessary.
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import logging
3 import os
4 import pprint
5 import sys
6 import types
7 import subprocess
8 import json
9 import UserDict
10 import re
11 import hashlib
12 import string
13 import bz2
14 import zlib
15 import fcntl
16 import time
17 import threading
18 import timer
19 import datetime
20 import ssl
21 import socket
22 import requests
23
24 import arvados
25 import arvados.config as config
26 import arvados.errors
27 import arvados.retry as retry
28 import arvados.util
29
30 try:
31     # Workaround for urllib3 bug.
32     # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
33     # However, urllib3 prior to version 1.10 has a major bug in this feature
34     # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
35     # Unfortunately a certain major Linux distribution is stablizing on urllib3
36     # 1.9.1 which means the following workaround is necessary to be able to use
37     # the arvados python sdk with the distribution-provided packages.
38
39     import urllib3
40     urllib3_ok = False
41     urllib3version = re.match(r'(\d+)\.(\d+)\.(\d+)', urllib3.__version__)
42     if (urllib3version and
43         int(urllib3version.group(1)) == 1 and
44         int(urllib3version.group(2)) >= 10):
45         urllib3_ok = True
46     if not urllib3_ok:
47         from urllib3.contrib import pyopenssl
48         pyopenssl.extract_from_urllib3()
49 except ImportError:
50     pass
51
52 _logger = logging.getLogger('arvados.keep')
53 global_client_object = None
54
55 class KeepLocator(object):
56     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
57     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
58
59     def __init__(self, locator_str):
60         self.hints = []
61         self._perm_sig = None
62         self._perm_expiry = None
63         pieces = iter(locator_str.split('+'))
64         self.md5sum = next(pieces)
65         try:
66             self.size = int(next(pieces))
67         except StopIteration:
68             self.size = None
69         for hint in pieces:
70             if self.HINT_RE.match(hint) is None:
71                 raise ValueError("unrecognized hint data {}".format(hint))
72             elif hint.startswith('A'):
73                 self.parse_permission_hint(hint)
74             else:
75                 self.hints.append(hint)
76
77     def __str__(self):
78         return '+'.join(
79             str(s) for s in [self.md5sum, self.size,
80                              self.permission_hint()] + self.hints
81             if s is not None)
82
83     def stripped(self):
84         if self.size is not None:
85             return "%s+%i" % (self.md5sum, self.size)
86         else:
87             return self.md5sum
88
89     def _make_hex_prop(name, length):
90         # Build and return a new property with the given name that
91         # must be a hex string of the given length.
92         data_name = '_{}'.format(name)
93         def getter(self):
94             return getattr(self, data_name)
95         def setter(self, hex_str):
96             if not arvados.util.is_hex(hex_str, length):
97                 raise ValueError("{} must be a {}-digit hex string: {}".
98                                  format(name, length, hex_str))
99             setattr(self, data_name, hex_str)
100         return property(getter, setter)
101
102     md5sum = _make_hex_prop('md5sum', 32)
103     perm_sig = _make_hex_prop('perm_sig', 40)
104
105     @property
106     def perm_expiry(self):
107         return self._perm_expiry
108
109     @perm_expiry.setter
110     def perm_expiry(self, value):
111         if not arvados.util.is_hex(value, 1, 8):
112             raise ValueError(
113                 "permission timestamp must be a hex Unix timestamp: {}".
114                 format(value))
115         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
116
117     def permission_hint(self):
118         data = [self.perm_sig, self.perm_expiry]
119         if None in data:
120             return None
121         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
122         return "A{}@{:08x}".format(*data)
123
124     def parse_permission_hint(self, s):
125         try:
126             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
127         except IndexError:
128             raise ValueError("bad permission hint {}".format(s))
129
130     def permission_expired(self, as_of_dt=None):
131         if self.perm_expiry is None:
132             return False
133         elif as_of_dt is None:
134             as_of_dt = datetime.datetime.now()
135         return self.perm_expiry <= as_of_dt
136
137
138 class Keep(object):
139     """Simple interface to a global KeepClient object.
140
141     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
142     own API client.  The global KeepClient will build an API client from the
143     current Arvados configuration, which may not match the one you built.
144     """
145     _last_key = None
146
147     @classmethod
148     def global_client_object(cls):
149         global global_client_object
150         # Previously, KeepClient would change its behavior at runtime based
151         # on these configuration settings.  We simulate that behavior here
152         # by checking the values and returning a new KeepClient if any of
153         # them have changed.
154         key = (config.get('ARVADOS_API_HOST'),
155                config.get('ARVADOS_API_TOKEN'),
156                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
157                config.get('ARVADOS_KEEP_PROXY'),
158                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
159                os.environ.get('KEEP_LOCAL_STORE'))
160         if (global_client_object is None) or (cls._last_key != key):
161             global_client_object = KeepClient()
162             cls._last_key = key
163         return global_client_object
164
165     @staticmethod
166     def get(locator, **kwargs):
167         return Keep.global_client_object().get(locator, **kwargs)
168
169     @staticmethod
170     def put(data, **kwargs):
171         return Keep.global_client_object().put(data, **kwargs)
172
173 class KeepBlockCache(object):
174     # Default RAM cache is 256MiB
175     def __init__(self, cache_max=(256 * 1024 * 1024)):
176         self.cache_max = cache_max
177         self._cache = []
178         self._cache_lock = threading.Lock()
179
180     class CacheSlot(object):
181         def __init__(self, locator):
182             self.locator = locator
183             self.ready = threading.Event()
184             self.content = None
185
186         def get(self):
187             self.ready.wait()
188             return self.content
189
190         def set(self, value):
191             self.content = value
192             self.ready.set()
193
194         def size(self):
195             if self.content is None:
196                 return 0
197             else:
198                 return len(self.content)
199
200     def cap_cache(self):
201         '''Cap the cache size to self.cache_max'''
202         with self._cache_lock:
203             # Select all slots except those where ready.is_set() and content is
204             # None (that means there was an error reading the block).
205             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
206             sm = sum([slot.size() for slot in self._cache])
207             while len(self._cache) > 0 and sm > self.cache_max:
208                 for i in xrange(len(self._cache)-1, -1, -1):
209                     if self._cache[i].ready.is_set():
210                         del self._cache[i]
211                         break
212                 sm = sum([slot.size() for slot in self._cache])
213
214     def _get(self, locator):
215         # Test if the locator is already in the cache
216         for i in xrange(0, len(self._cache)):
217             if self._cache[i].locator == locator:
218                 n = self._cache[i]
219                 if i != 0:
220                     # move it to the front
221                     del self._cache[i]
222                     self._cache.insert(0, n)
223                 return n
224         return None
225
226     def get(self, locator):
227         with self._cache_lock:
228             return self._get(locator)
229
230     def reserve_cache(self, locator):
231         '''Reserve a cache slot for the specified locator,
232         or return the existing slot.'''
233         with self._cache_lock:
234             n = self._get(locator)
235             if n:
236                 return n, False
237             else:
238                 # Add a new cache slot for the locator
239                 n = KeepBlockCache.CacheSlot(locator)
240                 self._cache.insert(0, n)
241                 return n, True
242
243 class KeepClient(object):
244
245     # Default Keep server connection timeout:  2 seconds
246     # Default Keep server read timeout:      300 seconds
247     # Default Keep proxy connection timeout:  20 seconds
248     # Default Keep proxy read timeout:       300 seconds
249     DEFAULT_TIMEOUT = (2, 300)
250     DEFAULT_PROXY_TIMEOUT = (20, 300)
251
252     class ThreadLimiter(object):
253         """
254         Limit the number of threads running at a given time to
255         {desired successes} minus {successes reported}. When successes
256         reported == desired, wake up the remaining threads and tell
257         them to quit.
258
259         Should be used in a "with" block.
260         """
261         def __init__(self, todo):
262             self._todo = todo
263             self._done = 0
264             self._response = None
265             self._todo_lock = threading.Semaphore(todo)
266             self._done_lock = threading.Lock()
267
268         def __enter__(self):
269             self._todo_lock.acquire()
270             return self
271
272         def __exit__(self, type, value, traceback):
273             self._todo_lock.release()
274
275         def shall_i_proceed(self):
276             """
277             Return true if the current thread should do stuff. Return
278             false if the current thread should just stop.
279             """
280             with self._done_lock:
281                 return (self._done < self._todo)
282
283         def save_response(self, response_body, replicas_stored):
284             """
285             Records a response body (a locator, possibly signed) returned by
286             the Keep server.  It is not necessary to save more than
287             one response, since we presume that any locator returned
288             in response to a successful request is valid.
289             """
290             with self._done_lock:
291                 self._done += replicas_stored
292                 self._response = response_body
293
294         def response(self):
295             """
296             Returns the body from the response to a PUT request.
297             """
298             with self._done_lock:
299                 return self._response
300
301         def done(self):
302             """
303             Return how many successes were reported.
304             """
305             with self._done_lock:
306                 return self._done
307
308
309     class KeepService(object):
310         # Make requests to a single Keep service, and track results.
311         HTTP_ERRORS = (requests.exceptions.RequestException,
312                        socket.error, ssl.SSLError)
313
314         def __init__(self, root, session, **headers):
315             self.root = root
316             self.last_result = None
317             self.success_flag = None
318             self.session = session
319             self.get_headers = {'Accept': 'application/octet-stream'}
320             self.get_headers.update(headers)
321             self.put_headers = headers
322
323         def usable(self):
324             return self.success_flag is not False
325
326         def finished(self):
327             return self.success_flag is not None
328
329         def last_status(self):
330             try:
331                 return self.last_result.status_code
332             except AttributeError:
333                 return None
334
335         def get(self, locator, timeout=None):
336             # locator is a KeepLocator object.
337             url = self.root + str(locator)
338             _logger.debug("Request: GET %s", url)
339             try:
340                 with timer.Timer() as t:
341                     result = self.session.get(url.encode('utf-8'),
342                                           headers=self.get_headers,
343                                           timeout=timeout)
344             except self.HTTP_ERRORS as e:
345                 _logger.debug("Request fail: GET %s => %s: %s",
346                               url, type(e), str(e))
347                 self.last_result = e
348             else:
349                 self.last_result = result
350                 self.success_flag = retry.check_http_response_success(result)
351                 content = result.content
352                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
353                              self.last_status(), len(content), t.msecs,
354                              (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
355                 if self.success_flag:
356                     resp_md5 = hashlib.md5(content).hexdigest()
357                     if resp_md5 == locator.md5sum:
358                         return content
359                     _logger.warning("Checksum fail: md5(%s) = %s",
360                                     url, resp_md5)
361             return None
362
363         def put(self, hash_s, body, timeout=None):
364             url = self.root + hash_s
365             _logger.debug("Request: PUT %s", url)
366             try:
367                 result = self.session.put(url.encode('utf-8'),
368                                       data=body,
369                                       headers=self.put_headers,
370                                       timeout=timeout)
371             except self.HTTP_ERRORS as e:
372                 _logger.debug("Request fail: PUT %s => %s: %s",
373                               url, type(e), str(e))
374                 self.last_result = e
375             else:
376                 self.last_result = result
377                 self.success_flag = retry.check_http_response_success(result)
378             return self.success_flag
379
380
381     class KeepWriterThread(threading.Thread):
382         """
383         Write a blob of data to the given Keep server. On success, call
384         save_response() of the given ThreadLimiter to save the returned
385         locator.
386         """
387         def __init__(self, keep_service, **kwargs):
388             super(KeepClient.KeepWriterThread, self).__init__()
389             self.service = keep_service
390             self.args = kwargs
391             self._success = False
392
393         def success(self):
394             return self._success
395
396         def run(self):
397             with self.args['thread_limiter'] as limiter:
398                 if not limiter.shall_i_proceed():
399                     # My turn arrived, but the job has been done without
400                     # me.
401                     return
402                 self.run_with_limiter(limiter)
403
404         def run_with_limiter(self, limiter):
405             if self.service.finished():
406                 return
407             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
408                           str(threading.current_thread()),
409                           self.args['data_hash'],
410                           len(self.args['data']),
411                           self.args['service_root'])
412             self._success = bool(self.service.put(
413                 self.args['data_hash'],
414                 self.args['data'],
415                 timeout=self.args.get('timeout', None)))
416             status = self.service.last_status()
417             if self._success:
418                 result = self.service.last_result
419                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
420                               str(threading.current_thread()),
421                               self.args['data_hash'],
422                               len(self.args['data']),
423                               self.args['service_root'])
424                 # Tick the 'done' counter for the number of replica
425                 # reported stored by the server, for the case that
426                 # we're talking to a proxy or other backend that
427                 # stores to multiple copies for us.
428                 try:
429                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
430                 except (KeyError, ValueError):
431                     replicas_stored = 1
432                 limiter.save_response(result.content.strip(), replicas_stored)
433             elif status is not None:
434                 _logger.debug("Request fail: PUT %s => %s %s",
435                               self.args['data_hash'], status,
436                               self.service.last_result.content)
437
438
439     def __init__(self, api_client=None, proxy=None,
440                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
441                  api_token=None, local_store=None, block_cache=None,
442                  num_retries=0, session=None):
443         """Initialize a new KeepClient.
444
445         Arguments:
446         :api_client:
447           The API client to use to find Keep services.  If not
448           provided, KeepClient will build one from available Arvados
449           configuration.
450
451         :proxy:
452           If specified, this KeepClient will send requests to this Keep
453           proxy.  Otherwise, KeepClient will fall back to the setting of the
454           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
455           KeepClient does not use a proxy, pass in an empty string.
456
457         :timeout:
458           The timeout (in seconds) for HTTP requests to Keep
459           non-proxy servers.  A tuple of two floats is interpreted as
460           (connection_timeout, read_timeout): see
461           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
462           Default: (2, 300).
463
464         :proxy_timeout:
465           The timeout (in seconds) for HTTP requests to
466           Keep proxies. A tuple of two floats is interpreted as
467           (connection_timeout, read_timeout). Default: (20, 300).
468
469         :api_token:
470           If you're not using an API client, but only talking
471           directly to a Keep proxy, this parameter specifies an API token
472           to authenticate Keep requests.  It is an error to specify both
473           api_client and api_token.  If you specify neither, KeepClient
474           will use one available from the Arvados configuration.
475
476         :local_store:
477           If specified, this KeepClient will bypass Keep
478           services, and save data to the named directory.  If unspecified,
479           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
480           environment variable.  If you want to ensure KeepClient does not
481           use local storage, pass in an empty string.  This is primarily
482           intended to mock a server for testing.
483
484         :num_retries:
485           The default number of times to retry failed requests.
486           This will be used as the default num_retries value when get() and
487           put() are called.  Default 0.
488
489         :session:
490           The requests.Session object to use for get() and put() requests.
491           Will create one if not specified.
492         """
493         self.lock = threading.Lock()
494         if proxy is None:
495             proxy = config.get('ARVADOS_KEEP_PROXY')
496         if api_token is None:
497             if api_client is None:
498                 api_token = config.get('ARVADOS_API_TOKEN')
499             else:
500                 api_token = api_client.api_token
501         elif api_client is not None:
502             raise ValueError(
503                 "can't build KeepClient with both API client and token")
504         if local_store is None:
505             local_store = os.environ.get('KEEP_LOCAL_STORE')
506
507         self.block_cache = block_cache if block_cache else KeepBlockCache()
508         self.timeout = timeout
509         self.proxy_timeout = proxy_timeout
510
511         if local_store:
512             self.local_store = local_store
513             self.get = self.local_store_get
514             self.put = self.local_store_put
515         else:
516             self.num_retries = num_retries
517             self.session = session if session is not None else requests.Session()
518             if proxy:
519                 if not proxy.endswith('/'):
520                     proxy += '/'
521                 self.api_token = api_token
522                 self._keep_services = [{
523                     'uuid': 'proxy',
524                     '_service_root': proxy,
525                     }]
526                 self.using_proxy = True
527                 self._static_services_list = True
528             else:
529                 # It's important to avoid instantiating an API client
530                 # unless we actually need one, for testing's sake.
531                 if api_client is None:
532                     api_client = arvados.api('v1')
533                 self.api_client = api_client
534                 self.api_token = api_client.api_token
535                 self._keep_services = None
536                 self.using_proxy = None
537                 self._static_services_list = False
538
539     def current_timeout(self):
540         """Return the appropriate timeout to use for this client: the proxy
541         timeout setting if the backend service is currently a proxy,
542         the regular timeout setting otherwise.
543         """
544         # TODO(twp): the timeout should be a property of a
545         # KeepService, not a KeepClient. See #4488.
546         return self.proxy_timeout if self.using_proxy else self.timeout
547
548     def build_services_list(self, force_rebuild=False):
549         if (self._static_services_list or
550               (self._keep_services and not force_rebuild)):
551             return
552         with self.lock:
553             try:
554                 keep_services = self.api_client.keep_services().accessible()
555             except Exception:  # API server predates Keep services.
556                 keep_services = self.api_client.keep_disks().list()
557
558             self._keep_services = keep_services.execute().get('items')
559             if not self._keep_services:
560                 raise arvados.errors.NoKeepServersError()
561
562             self.using_proxy = any(ks.get('service_type') == 'proxy'
563                                    for ks in self._keep_services)
564
565             # Precompute the base URI for each service.
566             for r in self._keep_services:
567                 r['_service_root'] = "{}://[{}]:{:d}/".format(
568                     'https' if r['service_ssl_flag'] else 'http',
569                     r['service_host'],
570                     r['service_port'])
571             _logger.debug(str(self._keep_services))
572
573     def _service_weight(self, data_hash, service_uuid):
574         """Compute the weight of a Keep service endpoint for a data
575         block with a known hash.
576
577         The weight is md5(h + u) where u is the last 15 characters of
578         the service endpoint's UUID.
579         """
580         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
581
582     def weighted_service_roots(self, data_hash, force_rebuild=False):
583         """Return an array of Keep service endpoints, in the order in
584         which they should be probed when reading or writing data with
585         the given hash.
586         """
587         self.build_services_list(force_rebuild)
588
589         # Sort the available services by weight (heaviest first) for
590         # this data_hash, and return their service_roots (base URIs)
591         # in that order.
592         sorted_roots = [
593             svc['_service_root'] for svc in sorted(
594                 self._keep_services,
595                 reverse=True,
596                 key=lambda svc: self._service_weight(data_hash, svc['uuid']))]
597         _logger.debug(data_hash + ': ' + str(sorted_roots))
598         return sorted_roots
599
600     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
601         # roots_map is a dictionary, mapping Keep service root strings
602         # to KeepService objects.  Poll for Keep services, and add any
603         # new ones to roots_map.  Return the current list of local
604         # root strings.
605         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
606         local_roots = self.weighted_service_roots(md5_s, force_rebuild)
607         for root in local_roots:
608             if root not in roots_map:
609                 roots_map[root] = self.KeepService(root, self.session, **headers)
610         return local_roots
611
612     @staticmethod
613     def _check_loop_result(result):
614         # KeepClient RetryLoops should save results as a 2-tuple: the
615         # actual result of the request, and the number of servers available
616         # to receive the request this round.
617         # This method returns True if there's a real result, False if
618         # there are no more servers available, otherwise None.
619         if isinstance(result, Exception):
620             return None
621         result, tried_server_count = result
622         if (result is not None) and (result is not False):
623             return True
624         elif tried_server_count < 1:
625             _logger.info("No more Keep services to try; giving up")
626             return False
627         else:
628             return None
629
630     def get_from_cache(self, loc):
631         """Fetch a block only if is in the cache, otherwise return None."""
632         slot = self.block_cache.get(loc)
633         if slot.ready.is_set():
634             return slot.get()
635         else:
636             return None
637
638     @retry.retry_method
639     def get(self, loc_s, num_retries=None):
640         """Get data from Keep.
641
642         This method fetches one or more blocks of data from Keep.  It
643         sends a request each Keep service registered with the API
644         server (or the proxy provided when this client was
645         instantiated), then each service named in location hints, in
646         sequence.  As soon as one service provides the data, it's
647         returned.
648
649         Arguments:
650         * loc_s: A string of one or more comma-separated locators to fetch.
651           This method returns the concatenation of these blocks.
652         * num_retries: The number of times to retry GET requests to
653           *each* Keep server if it returns temporary failures, with
654           exponential backoff.  Note that, in each loop, the method may try
655           to fetch data from every available Keep service, along with any
656           that are named in location hints in the locator.  The default value
657           is set when the KeepClient is initialized.
658         """
659         if ',' in loc_s:
660             return ''.join(self.get(x) for x in loc_s.split(','))
661         locator = KeepLocator(loc_s)
662         expect_hash = locator.md5sum
663         slot, first = self.block_cache.reserve_cache(expect_hash)
664         if not first:
665             v = slot.get()
666             return v
667
668         # See #3147 for a discussion of the loop implementation.  Highlights:
669         # * Refresh the list of Keep services after each failure, in case
670         #   it's being updated.
671         # * Retry until we succeed, we're out of retries, or every available
672         #   service has returned permanent failure.
673         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
674                       for hint in locator.hints if hint.startswith('K@')]
675         # Map root URLs their KeepService objects.
676         roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
677         blob = None
678         loop = retry.RetryLoop(num_retries, self._check_loop_result,
679                                backoff_start=2)
680         for tries_left in loop:
681             try:
682                 local_roots = self.map_new_services(
683                     roots_map, expect_hash,
684                     force_rebuild=(tries_left < num_retries))
685             except Exception as error:
686                 loop.save_result(error)
687                 continue
688
689             # Query KeepService objects that haven't returned
690             # permanent failure, in our specified shuffle order.
691             services_to_try = [roots_map[root]
692                                for root in (local_roots + hint_roots)
693                                if roots_map[root].usable()]
694             for keep_service in services_to_try:
695                 blob = keep_service.get(locator, timeout=self.current_timeout())
696                 if blob is not None:
697                     break
698             loop.save_result((blob, len(services_to_try)))
699
700         # Always cache the result, then return it if we succeeded.
701         slot.set(blob)
702         self.block_cache.cap_cache()
703         if loop.success():
704             return blob
705
706         try:
707             all_roots = local_roots + hint_roots
708         except NameError:
709             # We never successfully fetched local_roots.
710             all_roots = hint_roots
711         # Q: Including 403 is necessary for the Keep tests to continue
712         # passing, but maybe they should expect KeepReadError instead?
713         not_founds = sum(1 for key in all_roots
714                          if roots_map[key].last_status() in {403, 404, 410})
715         service_errors = ((key, roots_map[key].last_result)
716                           for key in all_roots)
717         if not roots_map:
718             raise arvados.errors.KeepReadError(
719                 "failed to read {}: no Keep services available ({})".format(
720                     loc_s, loop.last_result()))
721         elif not_founds == len(all_roots):
722             raise arvados.errors.NotFoundError(
723                 "{} not found".format(loc_s), service_errors)
724         else:
725             raise arvados.errors.KeepReadError(
726                 "failed to read {}".format(loc_s), service_errors, label="service")
727
728     @retry.retry_method
729     def put(self, data, copies=2, num_retries=None):
730         """Save data in Keep.
731
732         This method will get a list of Keep services from the API server, and
733         send the data to each one simultaneously in a new thread.  Once the
734         uploads are finished, if enough copies are saved, this method returns
735         the most recent HTTP response body.  If requests fail to upload
736         enough copies, this method raises KeepWriteError.
737
738         Arguments:
739         * data: The string of data to upload.
740         * copies: The number of copies that the user requires be saved.
741           Default 2.
742         * num_retries: The number of times to retry PUT requests to
743           *each* Keep server if it returns temporary failures, with
744           exponential backoff.  The default value is set when the
745           KeepClient is initialized.
746         """
747
748         if isinstance(data, unicode):
749             data = data.encode("ascii")
750         elif not isinstance(data, str):
751             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
752
753         data_hash = hashlib.md5(data).hexdigest()
754         if copies < 1:
755             return data_hash
756
757         headers = {}
758         if self.using_proxy:
759             # Tell the proxy how many copies we want it to store
760             headers['X-Keep-Desired-Replication'] = str(copies)
761         roots_map = {}
762         thread_limiter = KeepClient.ThreadLimiter(copies)
763         loop = retry.RetryLoop(num_retries, self._check_loop_result,
764                                backoff_start=2)
765         for tries_left in loop:
766             try:
767                 local_roots = self.map_new_services(
768                     roots_map, data_hash,
769                     force_rebuild=(tries_left < num_retries), **headers)
770             except Exception as error:
771                 loop.save_result(error)
772                 continue
773
774             threads = []
775             for service_root, ks in roots_map.iteritems():
776                 if ks.finished():
777                     continue
778                 t = KeepClient.KeepWriterThread(
779                     ks,
780                     data=data,
781                     data_hash=data_hash,
782                     service_root=service_root,
783                     thread_limiter=thread_limiter,
784                     timeout=self.current_timeout())
785                 t.start()
786                 threads.append(t)
787             for t in threads:
788                 t.join()
789             loop.save_result((thread_limiter.done() >= copies, len(threads)))
790
791         if loop.success():
792             return thread_limiter.response()
793         if not roots_map:
794             raise arvados.errors.KeepWriteError(
795                 "failed to write {}: no Keep services available ({})".format(
796                     data_hash, loop.last_result()))
797         else:
798             service_errors = ((key, roots_map[key].last_result)
799                               for key in local_roots
800                               if not roots_map[key].success_flag)
801             raise arvados.errors.KeepWriteError(
802                 "failed to write {} (wanted {} copies but wrote {})".format(
803                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
804
805     def local_store_put(self, data, copies=1, num_retries=None):
806         """A stub for put().
807
808         This method is used in place of the real put() method when
809         using local storage (see constructor's local_store argument).
810
811         copies and num_retries arguments are ignored: they are here
812         only for the sake of offering the same call signature as
813         put().
814
815         Data stored this way can be retrieved via local_store_get().
816         """
817         md5 = hashlib.md5(data).hexdigest()
818         locator = '%s+%d' % (md5, len(data))
819         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
820             f.write(data)
821         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
822                   os.path.join(self.local_store, md5))
823         return locator
824
825     def local_store_get(self, loc_s, num_retries=None):
826         """Companion to local_store_put()."""
827         try:
828             locator = KeepLocator(loc_s)
829         except ValueError:
830             raise arvados.errors.NotFoundError(
831                 "Invalid data locator: '%s'" % loc_s)
832         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
833             return ''
834         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
835             return f.read()
836
837     def is_cached(self, locator):
838         return self.block_cache.reserve_cache(expect_hash)