Check if cache slot is not None before trying to test the read flag. refs #5831
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import logging
3 import os
4 import pprint
5 import sys
6 import types
7 import subprocess
8 import json
9 import UserDict
10 import re
11 import hashlib
12 import string
13 import bz2
14 import zlib
15 import fcntl
16 import time
17 import threading
18 import timer
19 import datetime
20 import ssl
21 import socket
22 import requests
23
24 import arvados
25 import arvados.config as config
26 import arvados.errors
27 import arvados.retry as retry
28 import arvados.util
29
30 try:
31     # Workaround for urllib3 bug.
32     # The 'requests' library enables urllib3's SNI support by default, which uses pyopenssl.
33     # However, urllib3 prior to version 1.10 has a major bug in this feature
34     # (OpenSSL WantWriteError, https://github.com/shazow/urllib3/issues/412)
35     # Unfortunately Debian 8 is stabilizing on urllib3 1.9.1 which means the
36     # following workaround is necessary to be able to use
37     # the arvados python sdk with the distribution-provided packages.
38     import urllib3
39     from pkg_resources import parse_version
40     if parse_version(urllib3.__version__) < parse_version('1.10'):
41         from urllib3.contrib import pyopenssl
42         pyopenssl.extract_from_urllib3()
43 except ImportError:
44     pass
45
46 _logger = logging.getLogger('arvados.keep')
47 global_client_object = None
48
49 class KeepLocator(object):
50     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
51     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
52
53     def __init__(self, locator_str):
54         self.hints = []
55         self._perm_sig = None
56         self._perm_expiry = None
57         pieces = iter(locator_str.split('+'))
58         self.md5sum = next(pieces)
59         try:
60             self.size = int(next(pieces))
61         except StopIteration:
62             self.size = None
63         for hint in pieces:
64             if self.HINT_RE.match(hint) is None:
65                 raise ValueError("invalid hint format: {}".format(hint))
66             elif hint.startswith('A'):
67                 self.parse_permission_hint(hint)
68             else:
69                 self.hints.append(hint)
70
71     def __str__(self):
72         return '+'.join(
73             str(s) for s in [self.md5sum, self.size,
74                              self.permission_hint()] + self.hints
75             if s is not None)
76
77     def stripped(self):
78         if self.size is not None:
79             return "%s+%i" % (self.md5sum, self.size)
80         else:
81             return self.md5sum
82
83     def _make_hex_prop(name, length):
84         # Build and return a new property with the given name that
85         # must be a hex string of the given length.
86         data_name = '_{}'.format(name)
87         def getter(self):
88             return getattr(self, data_name)
89         def setter(self, hex_str):
90             if not arvados.util.is_hex(hex_str, length):
91                 raise ValueError("{} must be a {}-digit hex string: {}".
92                                  format(name, length, hex_str))
93             setattr(self, data_name, hex_str)
94         return property(getter, setter)
95
96     md5sum = _make_hex_prop('md5sum', 32)
97     perm_sig = _make_hex_prop('perm_sig', 40)
98
99     @property
100     def perm_expiry(self):
101         return self._perm_expiry
102
103     @perm_expiry.setter
104     def perm_expiry(self, value):
105         if not arvados.util.is_hex(value, 1, 8):
106             raise ValueError(
107                 "permission timestamp must be a hex Unix timestamp: {}".
108                 format(value))
109         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
110
111     def permission_hint(self):
112         data = [self.perm_sig, self.perm_expiry]
113         if None in data:
114             return None
115         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
116         return "A{}@{:08x}".format(*data)
117
118     def parse_permission_hint(self, s):
119         try:
120             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
121         except IndexError:
122             raise ValueError("bad permission hint {}".format(s))
123
124     def permission_expired(self, as_of_dt=None):
125         if self.perm_expiry is None:
126             return False
127         elif as_of_dt is None:
128             as_of_dt = datetime.datetime.now()
129         return self.perm_expiry <= as_of_dt
130
131
132 class Keep(object):
133     """Simple interface to a global KeepClient object.
134
135     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
136     own API client.  The global KeepClient will build an API client from the
137     current Arvados configuration, which may not match the one you built.
138     """
139     _last_key = None
140
141     @classmethod
142     def global_client_object(cls):
143         global global_client_object
144         # Previously, KeepClient would change its behavior at runtime based
145         # on these configuration settings.  We simulate that behavior here
146         # by checking the values and returning a new KeepClient if any of
147         # them have changed.
148         key = (config.get('ARVADOS_API_HOST'),
149                config.get('ARVADOS_API_TOKEN'),
150                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
151                config.get('ARVADOS_KEEP_PROXY'),
152                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
153                os.environ.get('KEEP_LOCAL_STORE'))
154         if (global_client_object is None) or (cls._last_key != key):
155             global_client_object = KeepClient()
156             cls._last_key = key
157         return global_client_object
158
159     @staticmethod
160     def get(locator, **kwargs):
161         return Keep.global_client_object().get(locator, **kwargs)
162
163     @staticmethod
164     def put(data, **kwargs):
165         return Keep.global_client_object().put(data, **kwargs)
166
167 class KeepBlockCache(object):
168     # Default RAM cache is 256MiB
169     def __init__(self, cache_max=(256 * 1024 * 1024)):
170         self.cache_max = cache_max
171         self._cache = []
172         self._cache_lock = threading.Lock()
173
174     class CacheSlot(object):
175         def __init__(self, locator):
176             self.locator = locator
177             self.ready = threading.Event()
178             self.content = None
179
180         def get(self):
181             self.ready.wait()
182             return self.content
183
184         def set(self, value):
185             self.content = value
186             self.ready.set()
187
188         def size(self):
189             if self.content is None:
190                 return 0
191             else:
192                 return len(self.content)
193
194     def cap_cache(self):
195         '''Cap the cache size to self.cache_max'''
196         with self._cache_lock:
197             # Select all slots except those where ready.is_set() and content is
198             # None (that means there was an error reading the block).
199             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
200             sm = sum([slot.size() for slot in self._cache])
201             while len(self._cache) > 0 and sm > self.cache_max:
202                 for i in xrange(len(self._cache)-1, -1, -1):
203                     if self._cache[i].ready.is_set():
204                         del self._cache[i]
205                         break
206                 sm = sum([slot.size() for slot in self._cache])
207
208     def _get(self, locator):
209         # Test if the locator is already in the cache
210         for i in xrange(0, len(self._cache)):
211             if self._cache[i].locator == locator:
212                 n = self._cache[i]
213                 if i != 0:
214                     # move it to the front
215                     del self._cache[i]
216                     self._cache.insert(0, n)
217                 return n
218         return None
219
220     def get(self, locator):
221         with self._cache_lock:
222             return self._get(locator)
223
224     def reserve_cache(self, locator):
225         '''Reserve a cache slot for the specified locator,
226         or return the existing slot.'''
227         with self._cache_lock:
228             n = self._get(locator)
229             if n:
230                 return n, False
231             else:
232                 # Add a new cache slot for the locator
233                 n = KeepBlockCache.CacheSlot(locator)
234                 self._cache.insert(0, n)
235                 return n, True
236
237 class KeepClient(object):
238
239     # Default Keep server connection timeout:  2 seconds
240     # Default Keep server read timeout:      300 seconds
241     # Default Keep proxy connection timeout:  20 seconds
242     # Default Keep proxy read timeout:       300 seconds
243     DEFAULT_TIMEOUT = (2, 300)
244     DEFAULT_PROXY_TIMEOUT = (20, 300)
245
246     class ThreadLimiter(object):
247         """
248         Limit the number of threads running at a given time to
249         {desired successes} minus {successes reported}. When successes
250         reported == desired, wake up the remaining threads and tell
251         them to quit.
252
253         Should be used in a "with" block.
254         """
255         def __init__(self, todo):
256             self._todo = todo
257             self._done = 0
258             self._response = None
259             self._todo_lock = threading.Semaphore(todo)
260             self._done_lock = threading.Lock()
261
262         def __enter__(self):
263             self._todo_lock.acquire()
264             return self
265
266         def __exit__(self, type, value, traceback):
267             self._todo_lock.release()
268
269         def shall_i_proceed(self):
270             """
271             Return true if the current thread should do stuff. Return
272             false if the current thread should just stop.
273             """
274             with self._done_lock:
275                 return (self._done < self._todo)
276
277         def save_response(self, response_body, replicas_stored):
278             """
279             Records a response body (a locator, possibly signed) returned by
280             the Keep server.  It is not necessary to save more than
281             one response, since we presume that any locator returned
282             in response to a successful request is valid.
283             """
284             with self._done_lock:
285                 self._done += replicas_stored
286                 self._response = response_body
287
288         def response(self):
289             """
290             Returns the body from the response to a PUT request.
291             """
292             with self._done_lock:
293                 return self._response
294
295         def done(self):
296             """
297             Return how many successes were reported.
298             """
299             with self._done_lock:
300                 return self._done
301
302
303     class KeepService(object):
304         # Make requests to a single Keep service, and track results.
305         HTTP_ERRORS = (requests.exceptions.RequestException,
306                        socket.error, ssl.SSLError)
307
308         def __init__(self, root, session, **headers):
309             self.root = root
310             self.last_result = None
311             self.success_flag = None
312             self.session = session
313             self.get_headers = {'Accept': 'application/octet-stream'}
314             self.get_headers.update(headers)
315             self.put_headers = headers
316
317         def usable(self):
318             return self.success_flag is not False
319
320         def finished(self):
321             return self.success_flag is not None
322
323         def last_status(self):
324             try:
325                 return self.last_result.status_code
326             except AttributeError:
327                 return None
328
329         def get(self, locator, timeout=None):
330             # locator is a KeepLocator object.
331             url = self.root + str(locator)
332             _logger.debug("Request: GET %s", url)
333             try:
334                 with timer.Timer() as t:
335                     result = self.session.get(url.encode('utf-8'),
336                                           headers=self.get_headers,
337                                           timeout=timeout)
338             except self.HTTP_ERRORS as e:
339                 _logger.debug("Request fail: GET %s => %s: %s",
340                               url, type(e), str(e))
341                 self.last_result = e
342             else:
343                 self.last_result = result
344                 self.success_flag = retry.check_http_response_success(result)
345                 content = result.content
346                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
347                              self.last_status(), len(content), t.msecs,
348                              (len(content)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
349                 if self.success_flag:
350                     resp_md5 = hashlib.md5(content).hexdigest()
351                     if resp_md5 == locator.md5sum:
352                         return content
353                     _logger.warning("Checksum fail: md5(%s) = %s",
354                                     url, resp_md5)
355             return None
356
357         def put(self, hash_s, body, timeout=None):
358             url = self.root + hash_s
359             _logger.debug("Request: PUT %s", url)
360             try:
361                 result = self.session.put(url.encode('utf-8'),
362                                       data=body,
363                                       headers=self.put_headers,
364                                       timeout=timeout)
365             except self.HTTP_ERRORS as e:
366                 _logger.debug("Request fail: PUT %s => %s: %s",
367                               url, type(e), str(e))
368                 self.last_result = e
369             else:
370                 self.last_result = result
371                 self.success_flag = retry.check_http_response_success(result)
372             return self.success_flag
373
374
375     class KeepWriterThread(threading.Thread):
376         """
377         Write a blob of data to the given Keep server. On success, call
378         save_response() of the given ThreadLimiter to save the returned
379         locator.
380         """
381         def __init__(self, keep_service, **kwargs):
382             super(KeepClient.KeepWriterThread, self).__init__()
383             self.service = keep_service
384             self.args = kwargs
385             self._success = False
386
387         def success(self):
388             return self._success
389
390         def run(self):
391             with self.args['thread_limiter'] as limiter:
392                 if not limiter.shall_i_proceed():
393                     # My turn arrived, but the job has been done without
394                     # me.
395                     return
396                 self.run_with_limiter(limiter)
397
398         def run_with_limiter(self, limiter):
399             if self.service.finished():
400                 return
401             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
402                           str(threading.current_thread()),
403                           self.args['data_hash'],
404                           len(self.args['data']),
405                           self.args['service_root'])
406             self._success = bool(self.service.put(
407                 self.args['data_hash'],
408                 self.args['data'],
409                 timeout=self.args.get('timeout', None)))
410             status = self.service.last_status()
411             if self._success:
412                 result = self.service.last_result
413                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
414                               str(threading.current_thread()),
415                               self.args['data_hash'],
416                               len(self.args['data']),
417                               self.args['service_root'])
418                 # Tick the 'done' counter for the number of replica
419                 # reported stored by the server, for the case that
420                 # we're talking to a proxy or other backend that
421                 # stores to multiple copies for us.
422                 try:
423                     replicas_stored = int(result.headers['x-keep-replicas-stored'])
424                 except (KeyError, ValueError):
425                     replicas_stored = 1
426                 limiter.save_response(result.content.strip(), replicas_stored)
427             elif status is not None:
428                 _logger.debug("Request fail: PUT %s => %s %s",
429                               self.args['data_hash'], status,
430                               self.service.last_result.content)
431
432
433     def __init__(self, api_client=None, proxy=None,
434                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
435                  api_token=None, local_store=None, block_cache=None,
436                  num_retries=0, session=None):
437         """Initialize a new KeepClient.
438
439         Arguments:
440         :api_client:
441           The API client to use to find Keep services.  If not
442           provided, KeepClient will build one from available Arvados
443           configuration.
444
445         :proxy:
446           If specified, this KeepClient will send requests to this Keep
447           proxy.  Otherwise, KeepClient will fall back to the setting of the
448           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
449           KeepClient does not use a proxy, pass in an empty string.
450
451         :timeout:
452           The initial timeout (in seconds) for HTTP requests to Keep
453           non-proxy servers.  A tuple of two floats is interpreted as
454           (connection_timeout, read_timeout): see
455           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
456           Because timeouts are often a result of transient server load, the
457           actual connection timeout will be increased by a factor of two on
458           each retry.
459           Default: (2, 300).
460
461         :proxy_timeout:
462           The initial timeout (in seconds) for HTTP requests to
463           Keep proxies. A tuple of two floats is interpreted as
464           (connection_timeout, read_timeout). The behavior described
465           above for adjusting connection timeouts on retry also applies.
466           Default: (20, 300).
467
468         :api_token:
469           If you're not using an API client, but only talking
470           directly to a Keep proxy, this parameter specifies an API token
471           to authenticate Keep requests.  It is an error to specify both
472           api_client and api_token.  If you specify neither, KeepClient
473           will use one available from the Arvados configuration.
474
475         :local_store:
476           If specified, this KeepClient will bypass Keep
477           services, and save data to the named directory.  If unspecified,
478           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
479           environment variable.  If you want to ensure KeepClient does not
480           use local storage, pass in an empty string.  This is primarily
481           intended to mock a server for testing.
482
483         :num_retries:
484           The default number of times to retry failed requests.
485           This will be used as the default num_retries value when get() and
486           put() are called.  Default 0.
487
488         :session:
489           The requests.Session object to use for get() and put() requests.
490           Will create one if not specified.
491         """
492         self.lock = threading.Lock()
493         if proxy is None:
494             proxy = config.get('ARVADOS_KEEP_PROXY')
495         if api_token is None:
496             if api_client is None:
497                 api_token = config.get('ARVADOS_API_TOKEN')
498             else:
499                 api_token = api_client.api_token
500         elif api_client is not None:
501             raise ValueError(
502                 "can't build KeepClient with both API client and token")
503         if local_store is None:
504             local_store = os.environ.get('KEEP_LOCAL_STORE')
505
506         self.block_cache = block_cache if block_cache else KeepBlockCache()
507         self.timeout = timeout
508         self.proxy_timeout = proxy_timeout
509
510         if local_store:
511             self.local_store = local_store
512             self.get = self.local_store_get
513             self.put = self.local_store_put
514         else:
515             self.num_retries = num_retries
516             self.session = session if session is not None else requests.Session()
517             if proxy:
518                 if not proxy.endswith('/'):
519                     proxy += '/'
520                 self.api_token = api_token
521                 self._gateway_services = {}
522                 self._keep_services = [{
523                     'uuid': 'proxy',
524                     '_service_root': proxy,
525                     }]
526                 self.using_proxy = True
527                 self._static_services_list = True
528             else:
529                 # It's important to avoid instantiating an API client
530                 # unless we actually need one, for testing's sake.
531                 if api_client is None:
532                     api_client = arvados.api('v1')
533                 self.api_client = api_client
534                 self.api_token = api_client.api_token
535                 self._gateway_services = {}
536                 self._keep_services = None
537                 self.using_proxy = None
538                 self._static_services_list = False
539
540     def current_timeout(self, attempt_number):
541         """Return the appropriate timeout to use for this client.
542
543         The proxy timeout setting if the backend service is currently a proxy,
544         the regular timeout setting otherwise.  The `attempt_number` indicates
545         how many times the operation has been tried already (starting from 0
546         for the first try), and scales the connection timeout portion of the
547         return value accordingly.
548
549         """
550         # TODO(twp): the timeout should be a property of a
551         # KeepService, not a KeepClient. See #4488.
552         t = self.proxy_timeout if self.using_proxy else self.timeout
553         return (t[0] * (1 << attempt_number), t[1])
554
555     def build_services_list(self, force_rebuild=False):
556         if (self._static_services_list or
557               (self._keep_services and not force_rebuild)):
558             return
559         with self.lock:
560             try:
561                 keep_services = self.api_client.keep_services().accessible()
562             except Exception:  # API server predates Keep services.
563                 keep_services = self.api_client.keep_disks().list()
564
565             accessible = keep_services.execute().get('items')
566             if not accessible:
567                 raise arvados.errors.NoKeepServersError()
568
569             # Precompute the base URI for each service.
570             for r in accessible:
571                 r['_service_root'] = "{}://[{}]:{:d}/".format(
572                     'https' if r['service_ssl_flag'] else 'http',
573                     r['service_host'],
574                     r['service_port'])
575
576             # Gateway services are only used when specified by UUID,
577             # so there's nothing to gain by filtering them by
578             # service_type.
579             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
580             _logger.debug(str(self._gateway_services))
581
582             self._keep_services = [
583                 ks for ks in accessible
584                 if ks.get('service_type') in ['disk', 'proxy']]
585             _logger.debug(str(self._keep_services))
586
587             self.using_proxy = any(ks.get('service_type') == 'proxy'
588                                    for ks in self._keep_services)
589
590     def _service_weight(self, data_hash, service_uuid):
591         """Compute the weight of a Keep service endpoint for a data
592         block with a known hash.
593
594         The weight is md5(h + u) where u is the last 15 characters of
595         the service endpoint's UUID.
596         """
597         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
598
599     def weighted_service_roots(self, locator, force_rebuild=False):
600         """Return an array of Keep service endpoints, in the order in
601         which they should be probed when reading or writing data with
602         the given hash+hints.
603         """
604         self.build_services_list(force_rebuild)
605
606         sorted_roots = []
607
608         # Use the services indicated by the given +K@... remote
609         # service hints, if any are present and can be resolved to a
610         # URI.
611         for hint in locator.hints:
612             if hint.startswith('K@'):
613                 if len(hint) == 7:
614                     sorted_roots.append(
615                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
616                 elif len(hint) == 29:
617                     svc = self._gateway_services.get(hint[2:])
618                     if svc:
619                         sorted_roots.append(svc['_service_root'])
620
621         # Sort the available local services by weight (heaviest first)
622         # for this locator, and return their service_roots (base URIs)
623         # in that order.
624         sorted_roots.extend([
625             svc['_service_root'] for svc in sorted(
626                 self._keep_services,
627                 reverse=True,
628                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
629         _logger.debug("{}: {}".format(locator, sorted_roots))
630         return sorted_roots
631
632     def map_new_services(self, roots_map, locator, force_rebuild, **headers):
633         # roots_map is a dictionary, mapping Keep service root strings
634         # to KeepService objects.  Poll for Keep services, and add any
635         # new ones to roots_map.  Return the current list of local
636         # root strings.
637         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
638         local_roots = self.weighted_service_roots(locator, force_rebuild)
639         for root in local_roots:
640             if root not in roots_map:
641                 roots_map[root] = self.KeepService(root, self.session, **headers)
642         return local_roots
643
644     @staticmethod
645     def _check_loop_result(result):
646         # KeepClient RetryLoops should save results as a 2-tuple: the
647         # actual result of the request, and the number of servers available
648         # to receive the request this round.
649         # This method returns True if there's a real result, False if
650         # there are no more servers available, otherwise None.
651         if isinstance(result, Exception):
652             return None
653         result, tried_server_count = result
654         if (result is not None) and (result is not False):
655             return True
656         elif tried_server_count < 1:
657             _logger.info("No more Keep services to try; giving up")
658             return False
659         else:
660             return None
661
662     def get_from_cache(self, loc):
663         """Fetch a block only if is in the cache, otherwise return None."""
664         slot = self.block_cache.get(loc)
665         if slot is not None and slot.ready.is_set():
666             return slot.get()
667         else:
668             return None
669
670     @retry.retry_method
671     def get(self, loc_s, num_retries=None):
672         """Get data from Keep.
673
674         This method fetches one or more blocks of data from Keep.  It
675         sends a request each Keep service registered with the API
676         server (or the proxy provided when this client was
677         instantiated), then each service named in location hints, in
678         sequence.  As soon as one service provides the data, it's
679         returned.
680
681         Arguments:
682         * loc_s: A string of one or more comma-separated locators to fetch.
683           This method returns the concatenation of these blocks.
684         * num_retries: The number of times to retry GET requests to
685           *each* Keep server if it returns temporary failures, with
686           exponential backoff.  Note that, in each loop, the method may try
687           to fetch data from every available Keep service, along with any
688           that are named in location hints in the locator.  The default value
689           is set when the KeepClient is initialized.
690         """
691         if ',' in loc_s:
692             return ''.join(self.get(x) for x in loc_s.split(','))
693         locator = KeepLocator(loc_s)
694         slot, first = self.block_cache.reserve_cache(locator.md5sum)
695         if not first:
696             v = slot.get()
697             return v
698
699         # If the locator has hints specifying a prefix (indicating a
700         # remote keepproxy) or the UUID of a local gateway service,
701         # read data from the indicated service(s) instead of the usual
702         # list of local disk services.
703         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
704                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
705         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
706                            for hint in locator.hints if (
707                                    hint.startswith('K@') and
708                                    len(hint) == 29 and
709                                    self._gateway_services.get(hint[2:])
710                                    )])
711         # Map root URLs to their KeepService objects.
712         roots_map = {root: self.KeepService(root, self.session) for root in hint_roots}
713
714         # See #3147 for a discussion of the loop implementation.  Highlights:
715         # * Refresh the list of Keep services after each failure, in case
716         #   it's being updated.
717         # * Retry until we succeed, we're out of retries, or every available
718         #   service has returned permanent failure.
719         sorted_roots = []
720         roots_map = {}
721         blob = None
722         loop = retry.RetryLoop(num_retries, self._check_loop_result,
723                                backoff_start=2)
724         for tries_left in loop:
725             try:
726                 sorted_roots = self.map_new_services(
727                     roots_map, locator,
728                     force_rebuild=(tries_left < num_retries))
729             except Exception as error:
730                 loop.save_result(error)
731                 continue
732
733             # Query KeepService objects that haven't returned
734             # permanent failure, in our specified shuffle order.
735             services_to_try = [roots_map[root]
736                                for root in sorted_roots
737                                if roots_map[root].usable()]
738             for keep_service in services_to_try:
739                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
740                 if blob is not None:
741                     break
742             loop.save_result((blob, len(services_to_try)))
743
744         # Always cache the result, then return it if we succeeded.
745         slot.set(blob)
746         self.block_cache.cap_cache()
747         if loop.success():
748             return blob
749
750         # Q: Including 403 is necessary for the Keep tests to continue
751         # passing, but maybe they should expect KeepReadError instead?
752         not_founds = sum(1 for key in sorted_roots
753                          if roots_map[key].last_status() in {403, 404, 410})
754         service_errors = ((key, roots_map[key].last_result)
755                           for key in sorted_roots)
756         if not roots_map:
757             raise arvados.errors.KeepReadError(
758                 "failed to read {}: no Keep services available ({})".format(
759                     loc_s, loop.last_result()))
760         elif not_founds == len(sorted_roots):
761             raise arvados.errors.NotFoundError(
762                 "{} not found".format(loc_s), service_errors)
763         else:
764             raise arvados.errors.KeepReadError(
765                 "failed to read {}".format(loc_s), service_errors, label="service")
766
767     @retry.retry_method
768     def put(self, data, copies=2, num_retries=None):
769         """Save data in Keep.
770
771         This method will get a list of Keep services from the API server, and
772         send the data to each one simultaneously in a new thread.  Once the
773         uploads are finished, if enough copies are saved, this method returns
774         the most recent HTTP response body.  If requests fail to upload
775         enough copies, this method raises KeepWriteError.
776
777         Arguments:
778         * data: The string of data to upload.
779         * copies: The number of copies that the user requires be saved.
780           Default 2.
781         * num_retries: The number of times to retry PUT requests to
782           *each* Keep server if it returns temporary failures, with
783           exponential backoff.  The default value is set when the
784           KeepClient is initialized.
785         """
786
787         if isinstance(data, unicode):
788             data = data.encode("ascii")
789         elif not isinstance(data, str):
790             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
791
792         data_hash = hashlib.md5(data).hexdigest()
793         if copies < 1:
794             return data_hash
795         locator = KeepLocator(data_hash + '+' + str(len(data)))
796
797         headers = {}
798         if self.using_proxy:
799             # Tell the proxy how many copies we want it to store
800             headers['X-Keep-Desired-Replication'] = str(copies)
801         roots_map = {}
802         thread_limiter = KeepClient.ThreadLimiter(copies)
803         loop = retry.RetryLoop(num_retries, self._check_loop_result,
804                                backoff_start=2)
805         for tries_left in loop:
806             try:
807                 local_roots = self.map_new_services(
808                     roots_map, locator,
809                     force_rebuild=(tries_left < num_retries), **headers)
810             except Exception as error:
811                 loop.save_result(error)
812                 continue
813
814             threads = []
815             for service_root, ks in roots_map.iteritems():
816                 if ks.finished():
817                     continue
818                 t = KeepClient.KeepWriterThread(
819                     ks,
820                     data=data,
821                     data_hash=data_hash,
822                     service_root=service_root,
823                     thread_limiter=thread_limiter,
824                     timeout=self.current_timeout(num_retries-tries_left))
825                 t.start()
826                 threads.append(t)
827             for t in threads:
828                 t.join()
829             loop.save_result((thread_limiter.done() >= copies, len(threads)))
830
831         if loop.success():
832             return thread_limiter.response()
833         if not roots_map:
834             raise arvados.errors.KeepWriteError(
835                 "failed to write {}: no Keep services available ({})".format(
836                     data_hash, loop.last_result()))
837         else:
838             service_errors = ((key, roots_map[key].last_result)
839                               for key in local_roots
840                               if not roots_map[key].success_flag)
841             raise arvados.errors.KeepWriteError(
842                 "failed to write {} (wanted {} copies but wrote {})".format(
843                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
844
845     def local_store_put(self, data, copies=1, num_retries=None):
846         """A stub for put().
847
848         This method is used in place of the real put() method when
849         using local storage (see constructor's local_store argument).
850
851         copies and num_retries arguments are ignored: they are here
852         only for the sake of offering the same call signature as
853         put().
854
855         Data stored this way can be retrieved via local_store_get().
856         """
857         md5 = hashlib.md5(data).hexdigest()
858         locator = '%s+%d' % (md5, len(data))
859         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
860             f.write(data)
861         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
862                   os.path.join(self.local_store, md5))
863         return locator
864
865     def local_store_get(self, loc_s, num_retries=None):
866         """Companion to local_store_put()."""
867         try:
868             locator = KeepLocator(loc_s)
869         except ValueError:
870             raise arvados.errors.NotFoundError(
871                 "Invalid data locator: '%s'" % loc_s)
872         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
873             return ''
874         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
875             return f.read()
876
877     def is_cached(self, locator):
878         return self.block_cache.reserve_cache(expect_hash)