5562: Use pycurl library (instead of requests) for Keep transactions.
[arvados.git] / sdk / python / arvados / keep.py
1 import bz2
2 import datetime
3 import fcntl
4 import functools
5 import gflags
6 import hashlib
7 import json
8 import logging
9 import os
10 import pprint
11 import pycurl
12 import Queue
13 import re
14 import socket
15 import ssl
16 import string
17 import StringIO
18 import subprocess
19 import sys
20 import threading
21 import time
22 import timer
23 import types
24 import UserDict
25 import zlib
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.retry as retry
31 import arvados.util
32
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
35
36
37 class KeepLocator(object):
38     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
40
41     def __init__(self, locator_str):
42         self.hints = []
43         self._perm_sig = None
44         self._perm_expiry = None
45         pieces = iter(locator_str.split('+'))
46         self.md5sum = next(pieces)
47         try:
48             self.size = int(next(pieces))
49         except StopIteration:
50             self.size = None
51         for hint in pieces:
52             if self.HINT_RE.match(hint) is None:
53                 raise ValueError("invalid hint format: {}".format(hint))
54             elif hint.startswith('A'):
55                 self.parse_permission_hint(hint)
56             else:
57                 self.hints.append(hint)
58
59     def __str__(self):
60         return '+'.join(
61             str(s) for s in [self.md5sum, self.size,
62                              self.permission_hint()] + self.hints
63             if s is not None)
64
65     def stripped(self):
66         if self.size is not None:
67             return "%s+%i" % (self.md5sum, self.size)
68         else:
69             return self.md5sum
70
71     def _make_hex_prop(name, length):
72         # Build and return a new property with the given name that
73         # must be a hex string of the given length.
74         data_name = '_{}'.format(name)
75         def getter(self):
76             return getattr(self, data_name)
77         def setter(self, hex_str):
78             if not arvados.util.is_hex(hex_str, length):
79                 raise ValueError("{} must be a {}-digit hex string: {}".
80                                  format(name, length, hex_str))
81             setattr(self, data_name, hex_str)
82         return property(getter, setter)
83
84     md5sum = _make_hex_prop('md5sum', 32)
85     perm_sig = _make_hex_prop('perm_sig', 40)
86
87     @property
88     def perm_expiry(self):
89         return self._perm_expiry
90
91     @perm_expiry.setter
92     def perm_expiry(self, value):
93         if not arvados.util.is_hex(value, 1, 8):
94             raise ValueError(
95                 "permission timestamp must be a hex Unix timestamp: {}".
96                 format(value))
97         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
98
99     def permission_hint(self):
100         data = [self.perm_sig, self.perm_expiry]
101         if None in data:
102             return None
103         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104         return "A{}@{:08x}".format(*data)
105
106     def parse_permission_hint(self, s):
107         try:
108             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
109         except IndexError:
110             raise ValueError("bad permission hint {}".format(s))
111
112     def permission_expired(self, as_of_dt=None):
113         if self.perm_expiry is None:
114             return False
115         elif as_of_dt is None:
116             as_of_dt = datetime.datetime.now()
117         return self.perm_expiry <= as_of_dt
118
119
120 class Keep(object):
121     """Simple interface to a global KeepClient object.
122
123     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
124     own API client.  The global KeepClient will build an API client from the
125     current Arvados configuration, which may not match the one you built.
126     """
127     _last_key = None
128
129     @classmethod
130     def global_client_object(cls):
131         global global_client_object
132         # Previously, KeepClient would change its behavior at runtime based
133         # on these configuration settings.  We simulate that behavior here
134         # by checking the values and returning a new KeepClient if any of
135         # them have changed.
136         key = (config.get('ARVADOS_API_HOST'),
137                config.get('ARVADOS_API_TOKEN'),
138                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139                config.get('ARVADOS_KEEP_PROXY'),
140                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141                os.environ.get('KEEP_LOCAL_STORE'))
142         if (global_client_object is None) or (cls._last_key != key):
143             global_client_object = KeepClient()
144             cls._last_key = key
145         return global_client_object
146
147     @staticmethod
148     def get(locator, **kwargs):
149         return Keep.global_client_object().get(locator, **kwargs)
150
151     @staticmethod
152     def put(data, **kwargs):
153         return Keep.global_client_object().put(data, **kwargs)
154
155 class KeepBlockCache(object):
156     # Default RAM cache is 256MiB
157     def __init__(self, cache_max=(256 * 1024 * 1024)):
158         self.cache_max = cache_max
159         self._cache = []
160         self._cache_lock = threading.Lock()
161
162     class CacheSlot(object):
163         def __init__(self, locator):
164             self.locator = locator
165             self.ready = threading.Event()
166             self.content = None
167
168         def get(self):
169             self.ready.wait()
170             return self.content
171
172         def set(self, value):
173             self.content = value
174             self.ready.set()
175
176         def size(self):
177             if self.content is None:
178                 return 0
179             else:
180                 return len(self.content)
181
182     def cap_cache(self):
183         '''Cap the cache size to self.cache_max'''
184         with self._cache_lock:
185             # Select all slots except those where ready.is_set() and content is
186             # None (that means there was an error reading the block).
187             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188             sm = sum([slot.size() for slot in self._cache])
189             while len(self._cache) > 0 and sm > self.cache_max:
190                 for i in xrange(len(self._cache)-1, -1, -1):
191                     if self._cache[i].ready.is_set():
192                         del self._cache[i]
193                         break
194                 sm = sum([slot.size() for slot in self._cache])
195
196     def _get(self, locator):
197         # Test if the locator is already in the cache
198         for i in xrange(0, len(self._cache)):
199             if self._cache[i].locator == locator:
200                 n = self._cache[i]
201                 if i != 0:
202                     # move it to the front
203                     del self._cache[i]
204                     self._cache.insert(0, n)
205                 return n
206         return None
207
208     def get(self, locator):
209         with self._cache_lock:
210             return self._get(locator)
211
212     def reserve_cache(self, locator):
213         '''Reserve a cache slot for the specified locator,
214         or return the existing slot.'''
215         with self._cache_lock:
216             n = self._get(locator)
217             if n:
218                 return n, False
219             else:
220                 # Add a new cache slot for the locator
221                 n = KeepBlockCache.CacheSlot(locator)
222                 self._cache.insert(0, n)
223                 return n, True
224
225 class KeepClient(object):
226
227     # Default Keep server connection timeout:  2 seconds
228     # Default Keep server read timeout:      300 seconds
229     # Default Keep proxy connection timeout:  20 seconds
230     # Default Keep proxy read timeout:       300 seconds
231     DEFAULT_TIMEOUT = (2, 300)
232     DEFAULT_PROXY_TIMEOUT = (20, 300)
233
234     class ThreadLimiter(object):
235         """
236         Limit the number of threads running at a given time to
237         {desired successes} minus {successes reported}. When successes
238         reported == desired, wake up the remaining threads and tell
239         them to quit.
240
241         Should be used in a "with" block.
242         """
243         def __init__(self, todo):
244             self._todo = todo
245             self._done = 0
246             self._response = None
247             self._todo_lock = threading.Semaphore(todo)
248             self._done_lock = threading.Lock()
249
250         def __enter__(self):
251             self._todo_lock.acquire()
252             return self
253
254         def __exit__(self, type, value, traceback):
255             self._todo_lock.release()
256
257         def shall_i_proceed(self):
258             """
259             Return true if the current thread should do stuff. Return
260             false if the current thread should just stop.
261             """
262             with self._done_lock:
263                 return (self._done < self._todo)
264
265         def save_response(self, response_body, replicas_stored):
266             """
267             Records a response body (a locator, possibly signed) returned by
268             the Keep server.  It is not necessary to save more than
269             one response, since we presume that any locator returned
270             in response to a successful request is valid.
271             """
272             with self._done_lock:
273                 self._done += replicas_stored
274                 self._response = response_body
275
276         def response(self):
277             """
278             Returns the body from the response to a PUT request.
279             """
280             with self._done_lock:
281                 return self._response
282
283         def done(self):
284             """
285             Return how many successes were reported.
286             """
287             with self._done_lock:
288                 return self._done
289
290
291     class KeepService(object):
292         """Make requests to a single Keep service, and track results.
293
294         A KeepService is intended to last long enough to perform one
295         transaction (GET or PUT) against one Keep service. This can
296         involve calling either get() or put() multiple times in order
297         to retry after transient failures. However, calling both get()
298         and put() on a single instance -- or using the same instance
299         to access two different Keep services -- will not produce
300         sensible behavior.
301         """
302
303         HTTP_ERRORS = (
304             socket.error,
305             ssl.SSLError,
306             arvados.errors.HttpError,
307         )
308
309         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
310             self.root = root
311             self._user_agent_pool = user_agent_pool
312             self._result = {'error': None}
313             self._usable = True
314             self._session = None
315             self.get_headers = {'Accept': 'application/octet-stream'}
316             self.get_headers.update(headers)
317             self.put_headers = headers
318
319         def usable(self):
320             """Is it worth attempting a request?"""
321             return self._usable
322
323         def finished(self):
324             """Did the request succeed or encounter permanent failure?"""
325             return self._result['error'] == False or not self._usable
326
327         def last_result(self):
328             return self._result
329
330         def _get_user_agent(self):
331             try:
332                 return self._user_agent_pool.get(False)
333             except Queue.Empty:
334                 return pycurl.Curl()
335
336         def _put_user_agent(self, ua):
337             try:
338                 ua.reset()
339                 self._user_agent_pool.put(ua, False)
340             except:
341                 ua.close()
342
343         def get(self, locator, timeout=None):
344             # locator is a KeepLocator object.
345             url = self.root + str(locator)
346             _logger.debug("Request: GET %s", url)
347             curl = self._get_user_agent()
348             try:
349                 with timer.Timer() as t:
350                     self._headers = {}
351                     response_body = StringIO.StringIO()
352                     curl.setopt(pycurl.NOSIGNAL, 1)
353                     curl.setopt(pycurl.URL, url.encode('utf-8'))
354                     curl.setopt(pycurl.HTTPHEADER, [
355                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
356                     curl.setopt(pycurl.WRITEDATA, response_body)
357                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
358                     self._setcurltimeouts(curl, timeout)
359                     try:
360                         curl.perform()
361                     except Exception as e:
362                         raise arvados.errors.HttpError(0, str(e))
363                     self._result = {
364                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
365                         'body': response_body.getvalue(),
366                         'headers': self._headers,
367                         'error': False,
368                     }
369                 ok = retry.check_http_response_success(self._result['status_code'])
370                 if not ok:
371                     self._result['error'] = arvados.errors.HttpError(
372                         self._result['status_code'],
373                         self._headers.get('x-status-line', 'Error'))
374             except self.HTTP_ERRORS as e:
375                 self._result = {
376                     'error': e,
377                 }
378                 ok = False
379             self._usable = ok != False
380             if not ok:
381                 _logger.debug("Request fail: GET %s => %s: %s",
382                               url, type(self._result['error']), str(self._result['error']))
383                 # Don't return this ua to the pool, in case it's broken.
384                 curl.close()
385                 return None
386             self._put_user_agent(curl)
387             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
388                          self._result['status_code'],
389                          len(self._result['body']),
390                          t.msecs,
391                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
392             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
393             if resp_md5 != locator.md5sum:
394                 _logger.warning("Checksum fail: md5(%s) = %s",
395                                 url, resp_md5)
396                 self._result['error'] = arvados.errors.HttpError(
397                     0, 'Checksum fail')
398                 return None
399             return self._result['body']
400
401         def put(self, hash_s, body, timeout=None):
402             url = self.root + hash_s
403             _logger.debug("Request: PUT %s", url)
404             curl = self._get_user_agent()
405             try:
406                 self._headers = {}
407                 response_body = StringIO.StringIO()
408                 curl.setopt(pycurl.NOSIGNAL, 1)
409                 curl.setopt(pycurl.URL, url.encode('utf-8'))
410                 curl.setopt(pycurl.POSTFIELDS, body)
411                 curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
412                 curl.setopt(pycurl.HTTPHEADER, [
413                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
414                 curl.setopt(pycurl.WRITEDATA, response_body)
415                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
416                 self._setcurltimeouts(curl, timeout)
417                 try:
418                     curl.perform()
419                 except Exception as e:
420                     raise arvados.errors.HttpError(0, str(e))
421                 self._result = {
422                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
423                     'body': response_body.getvalue(),
424                     'headers': self._headers,
425                     'error': False,
426                 }
427                 ok = retry.check_http_response_success(self._result['status_code'])
428                 if not ok:
429                     self._result['error'] = arvados.errors.HttpError(
430                         self._result['status_code'],
431                         self._headers.get('x-status-line', 'Error'))
432             except self.HTTP_ERRORS as e:
433                 self._result = {
434                     'error': e,
435                 }
436                 ok = False
437             self._usable = ok != False # still usable if ok is True or None
438             if not ok:
439                 _logger.debug("Request fail: PUT %s => %s: %s",
440                               url, type(self._result['error']), str(self._result['error']))
441                 # Don't return this ua to the pool, in case it's broken.
442                 curl.close()
443                 return False
444             self._put_user_agent(curl)
445             return True
446
447         def _setcurltimeouts(self, curl, timeouts):
448             if not timeouts:
449                 return
450             elif isinstance(timeouts, tuple):
451                 conn_t, xfer_t = timeouts
452             else:
453                 conn_t, xfer_t = (timeouts, timeouts)
454             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
455             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
456
457         def _headerfunction(self, header_line):
458             header_line = header_line.decode('iso-8859-1')
459             if ':' in header_line:
460                 name, value = header_line.split(':', 1)
461                 name = name.strip().lower()
462                 value = value.strip()
463             elif self._headers:
464                 name = self._lastheadername
465                 value = self._headers[name] + ' ' + header_line.strip()
466             elif header_line.startswith('HTTP/'):
467                 name = 'x-status-line'
468                 value = header_line
469             else:
470                 _logger.error("Unexpected header line: %s", header_line)
471                 return
472             self._lastheadername = name
473             self._headers[name] = value
474             # Returning None implies all bytes were written
475
476
477     class KeepWriterThread(threading.Thread):
478         """
479         Write a blob of data to the given Keep server. On success, call
480         save_response() of the given ThreadLimiter to save the returned
481         locator.
482         """
483         def __init__(self, keep_service, **kwargs):
484             super(KeepClient.KeepWriterThread, self).__init__()
485             self.service = keep_service
486             self.args = kwargs
487             self._success = False
488
489         def success(self):
490             return self._success
491
492         def run(self):
493             with self.args['thread_limiter'] as limiter:
494                 if not limiter.shall_i_proceed():
495                     # My turn arrived, but the job has been done without
496                     # me.
497                     return
498                 self.run_with_limiter(limiter)
499
500         def run_with_limiter(self, limiter):
501             if self.service.finished():
502                 return
503             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
504                           str(threading.current_thread()),
505                           self.args['data_hash'],
506                           len(self.args['data']),
507                           self.args['service_root'])
508             self._success = bool(self.service.put(
509                 self.args['data_hash'],
510                 self.args['data'],
511                 timeout=self.args.get('timeout', None)))
512             result = self.service.last_result()
513             if self._success:
514                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
515                               str(threading.current_thread()),
516                               self.args['data_hash'],
517                               len(self.args['data']),
518                               self.args['service_root'])
519                 # Tick the 'done' counter for the number of replica
520                 # reported stored by the server, for the case that
521                 # we're talking to a proxy or other backend that
522                 # stores to multiple copies for us.
523                 try:
524                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
525                 except (KeyError, ValueError):
526                     replicas_stored = 1
527                 limiter.save_response(result['body'].strip(), replicas_stored)
528             elif result.get('status_code', None):
529                 _logger.debug("Request fail: PUT %s => %s %s",
530                               self.args['data_hash'],
531                               result['status_code'],
532                               result['body'])
533
534
535     def __init__(self, api_client=None, proxy=None,
536                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
537                  api_token=None, local_store=None, block_cache=None,
538                  num_retries=0, session=None):
539         """Initialize a new KeepClient.
540
541         Arguments:
542         :api_client:
543           The API client to use to find Keep services.  If not
544           provided, KeepClient will build one from available Arvados
545           configuration.
546
547         :proxy:
548           If specified, this KeepClient will send requests to this Keep
549           proxy.  Otherwise, KeepClient will fall back to the setting of the
550           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
551           KeepClient does not use a proxy, pass in an empty string.
552
553         :timeout:
554           The initial timeout (in seconds) for HTTP requests to Keep
555           non-proxy servers.  A tuple of two floats is interpreted as
556           (connection_timeout, read_timeout): see
557           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
558           Because timeouts are often a result of transient server load, the
559           actual connection timeout will be increased by a factor of two on
560           each retry.
561           Default: (2, 300).
562
563         :proxy_timeout:
564           The initial timeout (in seconds) for HTTP requests to
565           Keep proxies. A tuple of two floats is interpreted as
566           (connection_timeout, read_timeout). The behavior described
567           above for adjusting connection timeouts on retry also applies.
568           Default: (20, 300).
569
570         :api_token:
571           If you're not using an API client, but only talking
572           directly to a Keep proxy, this parameter specifies an API token
573           to authenticate Keep requests.  It is an error to specify both
574           api_client and api_token.  If you specify neither, KeepClient
575           will use one available from the Arvados configuration.
576
577         :local_store:
578           If specified, this KeepClient will bypass Keep
579           services, and save data to the named directory.  If unspecified,
580           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
581           environment variable.  If you want to ensure KeepClient does not
582           use local storage, pass in an empty string.  This is primarily
583           intended to mock a server for testing.
584
585         :num_retries:
586           The default number of times to retry failed requests.
587           This will be used as the default num_retries value when get() and
588           put() are called.  Default 0.
589         """
590         self.lock = threading.Lock()
591         if proxy is None:
592             proxy = config.get('ARVADOS_KEEP_PROXY')
593         if api_token is None:
594             if api_client is None:
595                 api_token = config.get('ARVADOS_API_TOKEN')
596             else:
597                 api_token = api_client.api_token
598         elif api_client is not None:
599             raise ValueError(
600                 "can't build KeepClient with both API client and token")
601         if local_store is None:
602             local_store = os.environ.get('KEEP_LOCAL_STORE')
603
604         self.block_cache = block_cache if block_cache else KeepBlockCache()
605         self.timeout = timeout
606         self.proxy_timeout = proxy_timeout
607         self._user_agent_pool = Queue.LifoQueue()
608
609         if local_store:
610             self.local_store = local_store
611             self.get = self.local_store_get
612             self.put = self.local_store_put
613         else:
614             self.num_retries = num_retries
615             if proxy:
616                 if not proxy.endswith('/'):
617                     proxy += '/'
618                 self.api_token = api_token
619                 self._gateway_services = {}
620                 self._keep_services = [{
621                     'uuid': 'proxy',
622                     '_service_root': proxy,
623                     }]
624                 self.using_proxy = True
625                 self._static_services_list = True
626             else:
627                 # It's important to avoid instantiating an API client
628                 # unless we actually need one, for testing's sake.
629                 if api_client is None:
630                     api_client = arvados.api('v1')
631                 self.api_client = api_client
632                 self.api_token = api_client.api_token
633                 self._gateway_services = {}
634                 self._keep_services = None
635                 self.using_proxy = None
636                 self._static_services_list = False
637
638     def current_timeout(self, attempt_number):
639         """Return the appropriate timeout to use for this client.
640
641         The proxy timeout setting if the backend service is currently a proxy,
642         the regular timeout setting otherwise.  The `attempt_number` indicates
643         how many times the operation has been tried already (starting from 0
644         for the first try), and scales the connection timeout portion of the
645         return value accordingly.
646
647         """
648         # TODO(twp): the timeout should be a property of a
649         # KeepService, not a KeepClient. See #4488.
650         t = self.proxy_timeout if self.using_proxy else self.timeout
651         return (t[0] * (1 << attempt_number), t[1])
652
653     def build_services_list(self, force_rebuild=False):
654         if (self._static_services_list or
655               (self._keep_services and not force_rebuild)):
656             return
657         with self.lock:
658             try:
659                 keep_services = self.api_client.keep_services().accessible()
660             except Exception:  # API server predates Keep services.
661                 keep_services = self.api_client.keep_disks().list()
662
663             accessible = keep_services.execute().get('items')
664             if not accessible:
665                 raise arvados.errors.NoKeepServersError()
666
667             # Precompute the base URI for each service.
668             for r in accessible:
669                 host = r['service_host']
670                 if not host.startswith('[') and host.find(':') >= 0:
671                     # IPv6 URIs must be formatted like http://[::1]:80/...
672                     host = '[' + host + ']'
673                 r['_service_root'] = "{}://{}:{:d}/".format(
674                     'https' if r['service_ssl_flag'] else 'http',
675                     host,
676                     r['service_port'])
677
678             # Gateway services are only used when specified by UUID,
679             # so there's nothing to gain by filtering them by
680             # service_type.
681             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
682             _logger.debug(str(self._gateway_services))
683
684             self._keep_services = [
685                 ks for ks in accessible
686                 if ks.get('service_type') in ['disk', 'proxy']]
687             _logger.debug(str(self._keep_services))
688
689             self.using_proxy = any(ks.get('service_type') == 'proxy'
690                                    for ks in self._keep_services)
691
692     def _service_weight(self, data_hash, service_uuid):
693         """Compute the weight of a Keep service endpoint for a data
694         block with a known hash.
695
696         The weight is md5(h + u) where u is the last 15 characters of
697         the service endpoint's UUID.
698         """
699         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
700
701     def weighted_service_roots(self, locator, force_rebuild=False):
702         """Return an array of Keep service endpoints, in the order in
703         which they should be probed when reading or writing data with
704         the given hash+hints.
705         """
706         self.build_services_list(force_rebuild)
707
708         sorted_roots = []
709
710         # Use the services indicated by the given +K@... remote
711         # service hints, if any are present and can be resolved to a
712         # URI.
713         for hint in locator.hints:
714             if hint.startswith('K@'):
715                 if len(hint) == 7:
716                     sorted_roots.append(
717                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
718                 elif len(hint) == 29:
719                     svc = self._gateway_services.get(hint[2:])
720                     if svc:
721                         sorted_roots.append(svc['_service_root'])
722
723         # Sort the available local services by weight (heaviest first)
724         # for this locator, and return their service_roots (base URIs)
725         # in that order.
726         sorted_roots.extend([
727             svc['_service_root'] for svc in sorted(
728                 self._keep_services,
729                 reverse=True,
730                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
731         _logger.debug("{}: {}".format(locator, sorted_roots))
732         return sorted_roots
733
734     def map_new_services(self, roots_map, locator, force_rebuild, **headers):
735         # roots_map is a dictionary, mapping Keep service root strings
736         # to KeepService objects.  Poll for Keep services, and add any
737         # new ones to roots_map.  Return the current list of local
738         # root strings.
739         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
740         local_roots = self.weighted_service_roots(locator, force_rebuild)
741         for root in local_roots:
742             if root not in roots_map:
743                 roots_map[root] = self.KeepService(
744                     root, self._user_agent_pool, **headers)
745         return local_roots
746
747     @staticmethod
748     def _check_loop_result(result):
749         # KeepClient RetryLoops should save results as a 2-tuple: the
750         # actual result of the request, and the number of servers available
751         # to receive the request this round.
752         # This method returns True if there's a real result, False if
753         # there are no more servers available, otherwise None.
754         if isinstance(result, Exception):
755             return None
756         result, tried_server_count = result
757         if (result is not None) and (result is not False):
758             return True
759         elif tried_server_count < 1:
760             _logger.info("No more Keep services to try; giving up")
761             return False
762         else:
763             return None
764
765     def get_from_cache(self, loc):
766         """Fetch a block only if is in the cache, otherwise return None."""
767         slot = self.block_cache.get(loc)
768         if slot.ready.is_set():
769             return slot.get()
770         else:
771             return None
772
773     @retry.retry_method
774     def get(self, loc_s, num_retries=None):
775         """Get data from Keep.
776
777         This method fetches one or more blocks of data from Keep.  It
778         sends a request each Keep service registered with the API
779         server (or the proxy provided when this client was
780         instantiated), then each service named in location hints, in
781         sequence.  As soon as one service provides the data, it's
782         returned.
783
784         Arguments:
785         * loc_s: A string of one or more comma-separated locators to fetch.
786           This method returns the concatenation of these blocks.
787         * num_retries: The number of times to retry GET requests to
788           *each* Keep server if it returns temporary failures, with
789           exponential backoff.  Note that, in each loop, the method may try
790           to fetch data from every available Keep service, along with any
791           that are named in location hints in the locator.  The default value
792           is set when the KeepClient is initialized.
793         """
794         if ',' in loc_s:
795             return ''.join(self.get(x) for x in loc_s.split(','))
796         locator = KeepLocator(loc_s)
797         slot, first = self.block_cache.reserve_cache(locator.md5sum)
798         if not first:
799             v = slot.get()
800             return v
801
802         # If the locator has hints specifying a prefix (indicating a
803         # remote keepproxy) or the UUID of a local gateway service,
804         # read data from the indicated service(s) instead of the usual
805         # list of local disk services.
806         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
807                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
808         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
809                            for hint in locator.hints if (
810                                    hint.startswith('K@') and
811                                    len(hint) == 29 and
812                                    self._gateway_services.get(hint[2:])
813                                    )])
814         # Map root URLs to their KeepService objects.
815         roots_map = {
816             root: self.KeepService(root, self._user_agent_pool)
817             for root in hint_roots
818         }
819
820         # See #3147 for a discussion of the loop implementation.  Highlights:
821         # * Refresh the list of Keep services after each failure, in case
822         #   it's being updated.
823         # * Retry until we succeed, we're out of retries, or every available
824         #   service has returned permanent failure.
825         sorted_roots = []
826         roots_map = {}
827         blob = None
828         loop = retry.RetryLoop(num_retries, self._check_loop_result,
829                                backoff_start=2)
830         for tries_left in loop:
831             try:
832                 sorted_roots = self.map_new_services(
833                     roots_map, locator,
834                     force_rebuild=(tries_left < num_retries))
835             except Exception as error:
836                 loop.save_result(error)
837                 continue
838
839             # Query KeepService objects that haven't returned
840             # permanent failure, in our specified shuffle order.
841             services_to_try = [roots_map[root]
842                                for root in sorted_roots
843                                if roots_map[root].usable()]
844             for keep_service in services_to_try:
845                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
846                 if blob is not None:
847                     break
848             loop.save_result((blob, len(services_to_try)))
849
850         # Always cache the result, then return it if we succeeded.
851         slot.set(blob)
852         self.block_cache.cap_cache()
853         if loop.success():
854             return blob
855
856         # Q: Including 403 is necessary for the Keep tests to continue
857         # passing, but maybe they should expect KeepReadError instead?
858         not_founds = sum(1 for key in sorted_roots
859                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
860         service_errors = ((key, roots_map[key].last_result()['error'])
861                           for key in sorted_roots)
862         if not roots_map:
863             raise arvados.errors.KeepReadError(
864                 "failed to read {}: no Keep services available ({})".format(
865                     loc_s, loop.last_result()))
866         elif not_founds == len(sorted_roots):
867             raise arvados.errors.NotFoundError(
868                 "{} not found".format(loc_s), service_errors)
869         else:
870             raise arvados.errors.KeepReadError(
871                 "failed to read {}".format(loc_s), service_errors, label="service")
872
873     @retry.retry_method
874     def put(self, data, copies=2, num_retries=None):
875         """Save data in Keep.
876
877         This method will get a list of Keep services from the API server, and
878         send the data to each one simultaneously in a new thread.  Once the
879         uploads are finished, if enough copies are saved, this method returns
880         the most recent HTTP response body.  If requests fail to upload
881         enough copies, this method raises KeepWriteError.
882
883         Arguments:
884         * data: The string of data to upload.
885         * copies: The number of copies that the user requires be saved.
886           Default 2.
887         * num_retries: The number of times to retry PUT requests to
888           *each* Keep server if it returns temporary failures, with
889           exponential backoff.  The default value is set when the
890           KeepClient is initialized.
891         """
892
893         if isinstance(data, unicode):
894             data = data.encode("ascii")
895         elif not isinstance(data, str):
896             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
897
898         data_hash = hashlib.md5(data).hexdigest()
899         if copies < 1:
900             return data_hash
901         locator = KeepLocator(data_hash + '+' + str(len(data)))
902
903         headers = {}
904         if self.using_proxy:
905             # Tell the proxy how many copies we want it to store
906             headers['X-Keep-Desired-Replication'] = str(copies)
907         roots_map = {}
908         thread_limiter = KeepClient.ThreadLimiter(copies)
909         loop = retry.RetryLoop(num_retries, self._check_loop_result,
910                                backoff_start=2)
911         for tries_left in loop:
912             try:
913                 local_roots = self.map_new_services(
914                     roots_map, locator,
915                     force_rebuild=(tries_left < num_retries), **headers)
916             except Exception as error:
917                 loop.save_result(error)
918                 continue
919
920             threads = []
921             for service_root, ks in roots_map.iteritems():
922                 if ks.finished():
923                     continue
924                 t = KeepClient.KeepWriterThread(
925                     ks,
926                     data=data,
927                     data_hash=data_hash,
928                     service_root=service_root,
929                     thread_limiter=thread_limiter,
930                     timeout=self.current_timeout(num_retries-tries_left))
931                 t.start()
932                 threads.append(t)
933             for t in threads:
934                 t.join()
935             loop.save_result((thread_limiter.done() >= copies, len(threads)))
936
937         if loop.success():
938             return thread_limiter.response()
939         if not roots_map:
940             raise arvados.errors.KeepWriteError(
941                 "failed to write {}: no Keep services available ({})".format(
942                     data_hash, loop.last_result()))
943         else:
944             service_errors = ((key, roots_map[key].last_result()['error'])
945                               for key in local_roots
946                               if roots_map[key].last_result()['error'])
947             raise arvados.errors.KeepWriteError(
948                 "failed to write {} (wanted {} copies but wrote {})".format(
949                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
950
951     def local_store_put(self, data, copies=1, num_retries=None):
952         """A stub for put().
953
954         This method is used in place of the real put() method when
955         using local storage (see constructor's local_store argument).
956
957         copies and num_retries arguments are ignored: they are here
958         only for the sake of offering the same call signature as
959         put().
960
961         Data stored this way can be retrieved via local_store_get().
962         """
963         md5 = hashlib.md5(data).hexdigest()
964         locator = '%s+%d' % (md5, len(data))
965         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
966             f.write(data)
967         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
968                   os.path.join(self.local_store, md5))
969         return locator
970
971     def local_store_get(self, loc_s, num_retries=None):
972         """Companion to local_store_put()."""
973         try:
974             locator = KeepLocator(loc_s)
975         except ValueError:
976             raise arvados.errors.NotFoundError(
977                 "Invalid data locator: '%s'" % loc_s)
978         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
979             return ''
980         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
981             return f.read()
982
983     def is_cached(self, locator):
984         return self.block_cache.reserve_cache(expect_hash)