5562: Enable TCP Keepalive for Keep requests. refs #5822
[arvados.git] / sdk / python / arvados / keep.py
1 import bz2
2 import datetime
3 import fcntl
4 import functools
5 import gflags
6 import hashlib
7 import json
8 import logging
9 import os
10 import pprint
11 import pycurl
12 import Queue
13 import re
14 import socket
15 import ssl
16 import string
17 import StringIO
18 import subprocess
19 import sys
20 import threading
21 import time
22 import timer
23 import types
24 import UserDict
25 import zlib
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.retry as retry
31 import arvados.util
32
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
35
36
37 class KeepLocator(object):
38     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
40
41     def __init__(self, locator_str):
42         self.hints = []
43         self._perm_sig = None
44         self._perm_expiry = None
45         pieces = iter(locator_str.split('+'))
46         self.md5sum = next(pieces)
47         try:
48             self.size = int(next(pieces))
49         except StopIteration:
50             self.size = None
51         for hint in pieces:
52             if self.HINT_RE.match(hint) is None:
53                 raise ValueError("invalid hint format: {}".format(hint))
54             elif hint.startswith('A'):
55                 self.parse_permission_hint(hint)
56             else:
57                 self.hints.append(hint)
58
59     def __str__(self):
60         return '+'.join(
61             str(s) for s in [self.md5sum, self.size,
62                              self.permission_hint()] + self.hints
63             if s is not None)
64
65     def stripped(self):
66         if self.size is not None:
67             return "%s+%i" % (self.md5sum, self.size)
68         else:
69             return self.md5sum
70
71     def _make_hex_prop(name, length):
72         # Build and return a new property with the given name that
73         # must be a hex string of the given length.
74         data_name = '_{}'.format(name)
75         def getter(self):
76             return getattr(self, data_name)
77         def setter(self, hex_str):
78             if not arvados.util.is_hex(hex_str, length):
79                 raise ValueError("{} must be a {}-digit hex string: {}".
80                                  format(name, length, hex_str))
81             setattr(self, data_name, hex_str)
82         return property(getter, setter)
83
84     md5sum = _make_hex_prop('md5sum', 32)
85     perm_sig = _make_hex_prop('perm_sig', 40)
86
87     @property
88     def perm_expiry(self):
89         return self._perm_expiry
90
91     @perm_expiry.setter
92     def perm_expiry(self, value):
93         if not arvados.util.is_hex(value, 1, 8):
94             raise ValueError(
95                 "permission timestamp must be a hex Unix timestamp: {}".
96                 format(value))
97         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
98
99     def permission_hint(self):
100         data = [self.perm_sig, self.perm_expiry]
101         if None in data:
102             return None
103         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104         return "A{}@{:08x}".format(*data)
105
106     def parse_permission_hint(self, s):
107         try:
108             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
109         except IndexError:
110             raise ValueError("bad permission hint {}".format(s))
111
112     def permission_expired(self, as_of_dt=None):
113         if self.perm_expiry is None:
114             return False
115         elif as_of_dt is None:
116             as_of_dt = datetime.datetime.now()
117         return self.perm_expiry <= as_of_dt
118
119
120 class Keep(object):
121     """Simple interface to a global KeepClient object.
122
123     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
124     own API client.  The global KeepClient will build an API client from the
125     current Arvados configuration, which may not match the one you built.
126     """
127     _last_key = None
128
129     @classmethod
130     def global_client_object(cls):
131         global global_client_object
132         # Previously, KeepClient would change its behavior at runtime based
133         # on these configuration settings.  We simulate that behavior here
134         # by checking the values and returning a new KeepClient if any of
135         # them have changed.
136         key = (config.get('ARVADOS_API_HOST'),
137                config.get('ARVADOS_API_TOKEN'),
138                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139                config.get('ARVADOS_KEEP_PROXY'),
140                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141                os.environ.get('KEEP_LOCAL_STORE'))
142         if (global_client_object is None) or (cls._last_key != key):
143             global_client_object = KeepClient()
144             cls._last_key = key
145         return global_client_object
146
147     @staticmethod
148     def get(locator, **kwargs):
149         return Keep.global_client_object().get(locator, **kwargs)
150
151     @staticmethod
152     def put(data, **kwargs):
153         return Keep.global_client_object().put(data, **kwargs)
154
155 class KeepBlockCache(object):
156     # Default RAM cache is 256MiB
157     def __init__(self, cache_max=(256 * 1024 * 1024)):
158         self.cache_max = cache_max
159         self._cache = []
160         self._cache_lock = threading.Lock()
161
162     class CacheSlot(object):
163         def __init__(self, locator):
164             self.locator = locator
165             self.ready = threading.Event()
166             self.content = None
167
168         def get(self):
169             self.ready.wait()
170             return self.content
171
172         def set(self, value):
173             self.content = value
174             self.ready.set()
175
176         def size(self):
177             if self.content is None:
178                 return 0
179             else:
180                 return len(self.content)
181
182     def cap_cache(self):
183         '''Cap the cache size to self.cache_max'''
184         with self._cache_lock:
185             # Select all slots except those where ready.is_set() and content is
186             # None (that means there was an error reading the block).
187             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188             sm = sum([slot.size() for slot in self._cache])
189             while len(self._cache) > 0 and sm > self.cache_max:
190                 for i in xrange(len(self._cache)-1, -1, -1):
191                     if self._cache[i].ready.is_set():
192                         del self._cache[i]
193                         break
194                 sm = sum([slot.size() for slot in self._cache])
195
196     def _get(self, locator):
197         # Test if the locator is already in the cache
198         for i in xrange(0, len(self._cache)):
199             if self._cache[i].locator == locator:
200                 n = self._cache[i]
201                 if i != 0:
202                     # move it to the front
203                     del self._cache[i]
204                     self._cache.insert(0, n)
205                 return n
206         return None
207
208     def get(self, locator):
209         with self._cache_lock:
210             return self._get(locator)
211
212     def reserve_cache(self, locator):
213         '''Reserve a cache slot for the specified locator,
214         or return the existing slot.'''
215         with self._cache_lock:
216             n = self._get(locator)
217             if n:
218                 return n, False
219             else:
220                 # Add a new cache slot for the locator
221                 n = KeepBlockCache.CacheSlot(locator)
222                 self._cache.insert(0, n)
223                 return n, True
224
225 class KeepClient(object):
226
227     # Default Keep server connection timeout:  2 seconds
228     # Default Keep server read timeout:      300 seconds
229     # Default Keep proxy connection timeout:  20 seconds
230     # Default Keep proxy read timeout:       300 seconds
231     DEFAULT_TIMEOUT = (2, 300)
232     DEFAULT_PROXY_TIMEOUT = (20, 300)
233
234     class ThreadLimiter(object):
235         """
236         Limit the number of threads running at a given time to
237         {desired successes} minus {successes reported}. When successes
238         reported == desired, wake up the remaining threads and tell
239         them to quit.
240
241         Should be used in a "with" block.
242         """
243         def __init__(self, todo):
244             self._todo = todo
245             self._done = 0
246             self._response = None
247             self._todo_lock = threading.Semaphore(todo)
248             self._done_lock = threading.Lock()
249
250         def __enter__(self):
251             self._todo_lock.acquire()
252             return self
253
254         def __exit__(self, type, value, traceback):
255             self._todo_lock.release()
256
257         def shall_i_proceed(self):
258             """
259             Return true if the current thread should do stuff. Return
260             false if the current thread should just stop.
261             """
262             with self._done_lock:
263                 return (self._done < self._todo)
264
265         def save_response(self, response_body, replicas_stored):
266             """
267             Records a response body (a locator, possibly signed) returned by
268             the Keep server.  It is not necessary to save more than
269             one response, since we presume that any locator returned
270             in response to a successful request is valid.
271             """
272             with self._done_lock:
273                 self._done += replicas_stored
274                 self._response = response_body
275
276         def response(self):
277             """
278             Returns the body from the response to a PUT request.
279             """
280             with self._done_lock:
281                 return self._response
282
283         def done(self):
284             """
285             Return how many successes were reported.
286             """
287             with self._done_lock:
288                 return self._done
289
290
291     class KeepService(object):
292         """Make requests to a single Keep service, and track results.
293
294         A KeepService is intended to last long enough to perform one
295         transaction (GET or PUT) against one Keep service. This can
296         involve calling either get() or put() multiple times in order
297         to retry after transient failures. However, calling both get()
298         and put() on a single instance -- or using the same instance
299         to access two different Keep services -- will not produce
300         sensible behavior.
301         """
302
303         HTTP_ERRORS = (
304             socket.error,
305             ssl.SSLError,
306             arvados.errors.HttpError,
307         )
308
309         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
310             self.root = root
311             self._user_agent_pool = user_agent_pool
312             self._result = {'error': None}
313             self._usable = True
314             self._session = None
315             self.get_headers = {'Accept': 'application/octet-stream'}
316             self.get_headers.update(headers)
317             self.put_headers = headers
318
319         def usable(self):
320             """Is it worth attempting a request?"""
321             return self._usable
322
323         def finished(self):
324             """Did the request succeed or encounter permanent failure?"""
325             return self._result['error'] == False or not self._usable
326
327         def last_result(self):
328             return self._result
329
330         def _get_user_agent(self):
331             try:
332                 return self._user_agent_pool.get(False)
333             except Queue.Empty:
334                 return pycurl.Curl()
335
336         def _put_user_agent(self, ua):
337             try:
338                 ua.reset()
339                 self._user_agent_pool.put(ua, False)
340             except:
341                 ua.close()
342
343         def _socket_open(self, family, socktype, protocol, address):
344             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
345             s = socket.socket(family, socktype, protocol)
346             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
347             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
348             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
349             return s
350
351         def get(self, locator, timeout=None):
352             # locator is a KeepLocator object.
353             url = self.root + str(locator)
354             _logger.debug("Request: GET %s", url)
355             curl = self._get_user_agent()
356             try:
357                 with timer.Timer() as t:
358                     self._headers = {}
359                     response_body = StringIO.StringIO()
360                     curl.setopt(pycurl.NOSIGNAL, 1)
361                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
362                     curl.setopt(pycurl.URL, url.encode('utf-8'))
363                     curl.setopt(pycurl.HTTPHEADER, [
364                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
365                     curl.setopt(pycurl.WRITEDATA, response_body)
366                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
367                     self._setcurltimeouts(curl, timeout)
368                     try:
369                         curl.perform()
370                     except Exception as e:
371                         raise arvados.errors.HttpError(0, str(e))
372                     self._result = {
373                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
374                         'body': response_body.getvalue(),
375                         'headers': self._headers,
376                         'error': False,
377                     }
378                 ok = retry.check_http_response_success(self._result['status_code'])
379                 if not ok:
380                     self._result['error'] = arvados.errors.HttpError(
381                         self._result['status_code'],
382                         self._headers.get('x-status-line', 'Error'))
383             except self.HTTP_ERRORS as e:
384                 self._result = {
385                     'error': e,
386                 }
387                 ok = False
388             self._usable = ok != False
389             if not ok:
390                 _logger.debug("Request fail: GET %s => %s: %s",
391                               url, type(self._result['error']), str(self._result['error']))
392                 # Don't return this ua to the pool, in case it's broken.
393                 curl.close()
394                 return None
395             self._put_user_agent(curl)
396             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
397                          self._result['status_code'],
398                          len(self._result['body']),
399                          t.msecs,
400                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
401             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
402             if resp_md5 != locator.md5sum:
403                 _logger.warning("Checksum fail: md5(%s) = %s",
404                                 url, resp_md5)
405                 self._result['error'] = arvados.errors.HttpError(
406                     0, 'Checksum fail')
407                 return None
408             return self._result['body']
409
410         def put(self, hash_s, body, timeout=None):
411             url = self.root + hash_s
412             _logger.debug("Request: PUT %s", url)
413             curl = self._get_user_agent()
414             try:
415                 self._headers = {}
416                 response_body = StringIO.StringIO()
417                 curl.setopt(pycurl.NOSIGNAL, 1)
418                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
419                 curl.setopt(pycurl.URL, url.encode('utf-8'))
420                 curl.setopt(pycurl.POSTFIELDS, body)
421                 curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
422                 curl.setopt(pycurl.HTTPHEADER, [
423                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
424                 curl.setopt(pycurl.WRITEDATA, response_body)
425                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
426                 self._setcurltimeouts(curl, timeout)
427                 try:
428                     curl.perform()
429                 except Exception as e:
430                     raise arvados.errors.HttpError(0, str(e))
431                 self._result = {
432                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
433                     'body': response_body.getvalue(),
434                     'headers': self._headers,
435                     'error': False,
436                 }
437                 ok = retry.check_http_response_success(self._result['status_code'])
438                 if not ok:
439                     self._result['error'] = arvados.errors.HttpError(
440                         self._result['status_code'],
441                         self._headers.get('x-status-line', 'Error'))
442             except self.HTTP_ERRORS as e:
443                 self._result = {
444                     'error': e,
445                 }
446                 ok = False
447             self._usable = ok != False # still usable if ok is True or None
448             if not ok:
449                 _logger.debug("Request fail: PUT %s => %s: %s",
450                               url, type(self._result['error']), str(self._result['error']))
451                 # Don't return this ua to the pool, in case it's broken.
452                 curl.close()
453                 return False
454             self._put_user_agent(curl)
455             return True
456
457         def _setcurltimeouts(self, curl, timeouts):
458             if not timeouts:
459                 return
460             elif isinstance(timeouts, tuple):
461                 conn_t, xfer_t = timeouts
462             else:
463                 conn_t, xfer_t = (timeouts, timeouts)
464             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
465             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
466
467         def _headerfunction(self, header_line):
468             header_line = header_line.decode('iso-8859-1')
469             if ':' in header_line:
470                 name, value = header_line.split(':', 1)
471                 name = name.strip().lower()
472                 value = value.strip()
473             elif self._headers:
474                 name = self._lastheadername
475                 value = self._headers[name] + ' ' + header_line.strip()
476             elif header_line.startswith('HTTP/'):
477                 name = 'x-status-line'
478                 value = header_line
479             else:
480                 _logger.error("Unexpected header line: %s", header_line)
481                 return
482             self._lastheadername = name
483             self._headers[name] = value
484             # Returning None implies all bytes were written
485
486
487     class KeepWriterThread(threading.Thread):
488         """
489         Write a blob of data to the given Keep server. On success, call
490         save_response() of the given ThreadLimiter to save the returned
491         locator.
492         """
493         def __init__(self, keep_service, **kwargs):
494             super(KeepClient.KeepWriterThread, self).__init__()
495             self.service = keep_service
496             self.args = kwargs
497             self._success = False
498
499         def success(self):
500             return self._success
501
502         def run(self):
503             with self.args['thread_limiter'] as limiter:
504                 if not limiter.shall_i_proceed():
505                     # My turn arrived, but the job has been done without
506                     # me.
507                     return
508                 self.run_with_limiter(limiter)
509
510         def run_with_limiter(self, limiter):
511             if self.service.finished():
512                 return
513             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
514                           str(threading.current_thread()),
515                           self.args['data_hash'],
516                           len(self.args['data']),
517                           self.args['service_root'])
518             self._success = bool(self.service.put(
519                 self.args['data_hash'],
520                 self.args['data'],
521                 timeout=self.args.get('timeout', None)))
522             result = self.service.last_result()
523             if self._success:
524                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
525                               str(threading.current_thread()),
526                               self.args['data_hash'],
527                               len(self.args['data']),
528                               self.args['service_root'])
529                 # Tick the 'done' counter for the number of replica
530                 # reported stored by the server, for the case that
531                 # we're talking to a proxy or other backend that
532                 # stores to multiple copies for us.
533                 try:
534                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
535                 except (KeyError, ValueError):
536                     replicas_stored = 1
537                 limiter.save_response(result['body'].strip(), replicas_stored)
538             elif result.get('status_code', None):
539                 _logger.debug("Request fail: PUT %s => %s %s",
540                               self.args['data_hash'],
541                               result['status_code'],
542                               result['body'])
543
544
545     def __init__(self, api_client=None, proxy=None,
546                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
547                  api_token=None, local_store=None, block_cache=None,
548                  num_retries=0, session=None):
549         """Initialize a new KeepClient.
550
551         Arguments:
552         :api_client:
553           The API client to use to find Keep services.  If not
554           provided, KeepClient will build one from available Arvados
555           configuration.
556
557         :proxy:
558           If specified, this KeepClient will send requests to this Keep
559           proxy.  Otherwise, KeepClient will fall back to the setting of the
560           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
561           KeepClient does not use a proxy, pass in an empty string.
562
563         :timeout:
564           The initial timeout (in seconds) for HTTP requests to Keep
565           non-proxy servers.  A tuple of two floats is interpreted as
566           (connection_timeout, read_timeout): see
567           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
568           Because timeouts are often a result of transient server load, the
569           actual connection timeout will be increased by a factor of two on
570           each retry.
571           Default: (2, 300).
572
573         :proxy_timeout:
574           The initial timeout (in seconds) for HTTP requests to
575           Keep proxies. A tuple of two floats is interpreted as
576           (connection_timeout, read_timeout). The behavior described
577           above for adjusting connection timeouts on retry also applies.
578           Default: (20, 300).
579
580         :api_token:
581           If you're not using an API client, but only talking
582           directly to a Keep proxy, this parameter specifies an API token
583           to authenticate Keep requests.  It is an error to specify both
584           api_client and api_token.  If you specify neither, KeepClient
585           will use one available from the Arvados configuration.
586
587         :local_store:
588           If specified, this KeepClient will bypass Keep
589           services, and save data to the named directory.  If unspecified,
590           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
591           environment variable.  If you want to ensure KeepClient does not
592           use local storage, pass in an empty string.  This is primarily
593           intended to mock a server for testing.
594
595         :num_retries:
596           The default number of times to retry failed requests.
597           This will be used as the default num_retries value when get() and
598           put() are called.  Default 0.
599         """
600         self.lock = threading.Lock()
601         if proxy is None:
602             proxy = config.get('ARVADOS_KEEP_PROXY')
603         if api_token is None:
604             if api_client is None:
605                 api_token = config.get('ARVADOS_API_TOKEN')
606             else:
607                 api_token = api_client.api_token
608         elif api_client is not None:
609             raise ValueError(
610                 "can't build KeepClient with both API client and token")
611         if local_store is None:
612             local_store = os.environ.get('KEEP_LOCAL_STORE')
613
614         self.block_cache = block_cache if block_cache else KeepBlockCache()
615         self.timeout = timeout
616         self.proxy_timeout = proxy_timeout
617         self._user_agent_pool = Queue.LifoQueue()
618
619         if local_store:
620             self.local_store = local_store
621             self.get = self.local_store_get
622             self.put = self.local_store_put
623         else:
624             self.num_retries = num_retries
625             if proxy:
626                 if not proxy.endswith('/'):
627                     proxy += '/'
628                 self.api_token = api_token
629                 self._gateway_services = {}
630                 self._keep_services = [{
631                     'uuid': 'proxy',
632                     '_service_root': proxy,
633                     }]
634                 self.using_proxy = True
635                 self._static_services_list = True
636             else:
637                 # It's important to avoid instantiating an API client
638                 # unless we actually need one, for testing's sake.
639                 if api_client is None:
640                     api_client = arvados.api('v1')
641                 self.api_client = api_client
642                 self.api_token = api_client.api_token
643                 self._gateway_services = {}
644                 self._keep_services = None
645                 self.using_proxy = None
646                 self._static_services_list = False
647
648     def current_timeout(self, attempt_number):
649         """Return the appropriate timeout to use for this client.
650
651         The proxy timeout setting if the backend service is currently a proxy,
652         the regular timeout setting otherwise.  The `attempt_number` indicates
653         how many times the operation has been tried already (starting from 0
654         for the first try), and scales the connection timeout portion of the
655         return value accordingly.
656
657         """
658         # TODO(twp): the timeout should be a property of a
659         # KeepService, not a KeepClient. See #4488.
660         t = self.proxy_timeout if self.using_proxy else self.timeout
661         return (t[0] * (1 << attempt_number), t[1])
662
663     def build_services_list(self, force_rebuild=False):
664         if (self._static_services_list or
665               (self._keep_services and not force_rebuild)):
666             return
667         with self.lock:
668             try:
669                 keep_services = self.api_client.keep_services().accessible()
670             except Exception:  # API server predates Keep services.
671                 keep_services = self.api_client.keep_disks().list()
672
673             accessible = keep_services.execute().get('items')
674             if not accessible:
675                 raise arvados.errors.NoKeepServersError()
676
677             # Precompute the base URI for each service.
678             for r in accessible:
679                 host = r['service_host']
680                 if not host.startswith('[') and host.find(':') >= 0:
681                     # IPv6 URIs must be formatted like http://[::1]:80/...
682                     host = '[' + host + ']'
683                 r['_service_root'] = "{}://{}:{:d}/".format(
684                     'https' if r['service_ssl_flag'] else 'http',
685                     host,
686                     r['service_port'])
687
688             # Gateway services are only used when specified by UUID,
689             # so there's nothing to gain by filtering them by
690             # service_type.
691             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
692             _logger.debug(str(self._gateway_services))
693
694             self._keep_services = [
695                 ks for ks in accessible
696                 if ks.get('service_type') in ['disk', 'proxy']]
697             _logger.debug(str(self._keep_services))
698
699             self.using_proxy = any(ks.get('service_type') == 'proxy'
700                                    for ks in self._keep_services)
701
702     def _service_weight(self, data_hash, service_uuid):
703         """Compute the weight of a Keep service endpoint for a data
704         block with a known hash.
705
706         The weight is md5(h + u) where u is the last 15 characters of
707         the service endpoint's UUID.
708         """
709         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
710
711     def weighted_service_roots(self, locator, force_rebuild=False):
712         """Return an array of Keep service endpoints, in the order in
713         which they should be probed when reading or writing data with
714         the given hash+hints.
715         """
716         self.build_services_list(force_rebuild)
717
718         sorted_roots = []
719
720         # Use the services indicated by the given +K@... remote
721         # service hints, if any are present and can be resolved to a
722         # URI.
723         for hint in locator.hints:
724             if hint.startswith('K@'):
725                 if len(hint) == 7:
726                     sorted_roots.append(
727                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
728                 elif len(hint) == 29:
729                     svc = self._gateway_services.get(hint[2:])
730                     if svc:
731                         sorted_roots.append(svc['_service_root'])
732
733         # Sort the available local services by weight (heaviest first)
734         # for this locator, and return their service_roots (base URIs)
735         # in that order.
736         sorted_roots.extend([
737             svc['_service_root'] for svc in sorted(
738                 self._keep_services,
739                 reverse=True,
740                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
741         _logger.debug("{}: {}".format(locator, sorted_roots))
742         return sorted_roots
743
744     def map_new_services(self, roots_map, locator, force_rebuild, **headers):
745         # roots_map is a dictionary, mapping Keep service root strings
746         # to KeepService objects.  Poll for Keep services, and add any
747         # new ones to roots_map.  Return the current list of local
748         # root strings.
749         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
750         local_roots = self.weighted_service_roots(locator, force_rebuild)
751         for root in local_roots:
752             if root not in roots_map:
753                 roots_map[root] = self.KeepService(
754                     root, self._user_agent_pool, **headers)
755         return local_roots
756
757     @staticmethod
758     def _check_loop_result(result):
759         # KeepClient RetryLoops should save results as a 2-tuple: the
760         # actual result of the request, and the number of servers available
761         # to receive the request this round.
762         # This method returns True if there's a real result, False if
763         # there are no more servers available, otherwise None.
764         if isinstance(result, Exception):
765             return None
766         result, tried_server_count = result
767         if (result is not None) and (result is not False):
768             return True
769         elif tried_server_count < 1:
770             _logger.info("No more Keep services to try; giving up")
771             return False
772         else:
773             return None
774
775     def get_from_cache(self, loc):
776         """Fetch a block only if is in the cache, otherwise return None."""
777         slot = self.block_cache.get(loc)
778         if slot.ready.is_set():
779             return slot.get()
780         else:
781             return None
782
783     @retry.retry_method
784     def get(self, loc_s, num_retries=None):
785         """Get data from Keep.
786
787         This method fetches one or more blocks of data from Keep.  It
788         sends a request each Keep service registered with the API
789         server (or the proxy provided when this client was
790         instantiated), then each service named in location hints, in
791         sequence.  As soon as one service provides the data, it's
792         returned.
793
794         Arguments:
795         * loc_s: A string of one or more comma-separated locators to fetch.
796           This method returns the concatenation of these blocks.
797         * num_retries: The number of times to retry GET requests to
798           *each* Keep server if it returns temporary failures, with
799           exponential backoff.  Note that, in each loop, the method may try
800           to fetch data from every available Keep service, along with any
801           that are named in location hints in the locator.  The default value
802           is set when the KeepClient is initialized.
803         """
804         if ',' in loc_s:
805             return ''.join(self.get(x) for x in loc_s.split(','))
806         locator = KeepLocator(loc_s)
807         slot, first = self.block_cache.reserve_cache(locator.md5sum)
808         if not first:
809             v = slot.get()
810             return v
811
812         # If the locator has hints specifying a prefix (indicating a
813         # remote keepproxy) or the UUID of a local gateway service,
814         # read data from the indicated service(s) instead of the usual
815         # list of local disk services.
816         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
817                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
818         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
819                            for hint in locator.hints if (
820                                    hint.startswith('K@') and
821                                    len(hint) == 29 and
822                                    self._gateway_services.get(hint[2:])
823                                    )])
824         # Map root URLs to their KeepService objects.
825         roots_map = {
826             root: self.KeepService(root, self._user_agent_pool)
827             for root in hint_roots
828         }
829
830         # See #3147 for a discussion of the loop implementation.  Highlights:
831         # * Refresh the list of Keep services after each failure, in case
832         #   it's being updated.
833         # * Retry until we succeed, we're out of retries, or every available
834         #   service has returned permanent failure.
835         sorted_roots = []
836         roots_map = {}
837         blob = None
838         loop = retry.RetryLoop(num_retries, self._check_loop_result,
839                                backoff_start=2)
840         for tries_left in loop:
841             try:
842                 sorted_roots = self.map_new_services(
843                     roots_map, locator,
844                     force_rebuild=(tries_left < num_retries))
845             except Exception as error:
846                 loop.save_result(error)
847                 continue
848
849             # Query KeepService objects that haven't returned
850             # permanent failure, in our specified shuffle order.
851             services_to_try = [roots_map[root]
852                                for root in sorted_roots
853                                if roots_map[root].usable()]
854             for keep_service in services_to_try:
855                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
856                 if blob is not None:
857                     break
858             loop.save_result((blob, len(services_to_try)))
859
860         # Always cache the result, then return it if we succeeded.
861         slot.set(blob)
862         self.block_cache.cap_cache()
863         if loop.success():
864             return blob
865
866         # Q: Including 403 is necessary for the Keep tests to continue
867         # passing, but maybe they should expect KeepReadError instead?
868         not_founds = sum(1 for key in sorted_roots
869                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
870         service_errors = ((key, roots_map[key].last_result()['error'])
871                           for key in sorted_roots)
872         if not roots_map:
873             raise arvados.errors.KeepReadError(
874                 "failed to read {}: no Keep services available ({})".format(
875                     loc_s, loop.last_result()))
876         elif not_founds == len(sorted_roots):
877             raise arvados.errors.NotFoundError(
878                 "{} not found".format(loc_s), service_errors)
879         else:
880             raise arvados.errors.KeepReadError(
881                 "failed to read {}".format(loc_s), service_errors, label="service")
882
883     @retry.retry_method
884     def put(self, data, copies=2, num_retries=None):
885         """Save data in Keep.
886
887         This method will get a list of Keep services from the API server, and
888         send the data to each one simultaneously in a new thread.  Once the
889         uploads are finished, if enough copies are saved, this method returns
890         the most recent HTTP response body.  If requests fail to upload
891         enough copies, this method raises KeepWriteError.
892
893         Arguments:
894         * data: The string of data to upload.
895         * copies: The number of copies that the user requires be saved.
896           Default 2.
897         * num_retries: The number of times to retry PUT requests to
898           *each* Keep server if it returns temporary failures, with
899           exponential backoff.  The default value is set when the
900           KeepClient is initialized.
901         """
902
903         if isinstance(data, unicode):
904             data = data.encode("ascii")
905         elif not isinstance(data, str):
906             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
907
908         data_hash = hashlib.md5(data).hexdigest()
909         if copies < 1:
910             return data_hash
911         locator = KeepLocator(data_hash + '+' + str(len(data)))
912
913         headers = {}
914         if self.using_proxy:
915             # Tell the proxy how many copies we want it to store
916             headers['X-Keep-Desired-Replication'] = str(copies)
917         roots_map = {}
918         thread_limiter = KeepClient.ThreadLimiter(copies)
919         loop = retry.RetryLoop(num_retries, self._check_loop_result,
920                                backoff_start=2)
921         for tries_left in loop:
922             try:
923                 local_roots = self.map_new_services(
924                     roots_map, locator,
925                     force_rebuild=(tries_left < num_retries), **headers)
926             except Exception as error:
927                 loop.save_result(error)
928                 continue
929
930             threads = []
931             for service_root, ks in roots_map.iteritems():
932                 if ks.finished():
933                     continue
934                 t = KeepClient.KeepWriterThread(
935                     ks,
936                     data=data,
937                     data_hash=data_hash,
938                     service_root=service_root,
939                     thread_limiter=thread_limiter,
940                     timeout=self.current_timeout(num_retries-tries_left))
941                 t.start()
942                 threads.append(t)
943             for t in threads:
944                 t.join()
945             loop.save_result((thread_limiter.done() >= copies, len(threads)))
946
947         if loop.success():
948             return thread_limiter.response()
949         if not roots_map:
950             raise arvados.errors.KeepWriteError(
951                 "failed to write {}: no Keep services available ({})".format(
952                     data_hash, loop.last_result()))
953         else:
954             service_errors = ((key, roots_map[key].last_result()['error'])
955                               for key in local_roots
956                               if roots_map[key].last_result()['error'])
957             raise arvados.errors.KeepWriteError(
958                 "failed to write {} (wanted {} copies but wrote {})".format(
959                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
960
961     def local_store_put(self, data, copies=1, num_retries=None):
962         """A stub for put().
963
964         This method is used in place of the real put() method when
965         using local storage (see constructor's local_store argument).
966
967         copies and num_retries arguments are ignored: they are here
968         only for the sake of offering the same call signature as
969         put().
970
971         Data stored this way can be retrieved via local_store_get().
972         """
973         md5 = hashlib.md5(data).hexdigest()
974         locator = '%s+%d' % (md5, len(data))
975         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
976             f.write(data)
977         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
978                   os.path.join(self.local_store, md5))
979         return locator
980
981     def local_store_get(self, loc_s, num_retries=None):
982         """Companion to local_store_put()."""
983         try:
984             locator = KeepLocator(loc_s)
985         except ValueError:
986             raise arvados.errors.NotFoundError(
987                 "Invalid data locator: '%s'" % loc_s)
988         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
989             return ''
990         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
991             return f.read()
992
993     def is_cached(self, locator):
994         return self.block_cache.reserve_cache(expect_hash)