Closes #7235. Merge branch '7235-python-keep-client-timeout'
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import threading
13 import timer
14
15 import arvados
16 import arvados.config as config
17 import arvados.errors
18 import arvados.retry as retry
19 import arvados.util
20
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
23
24
25 class KeepLocator(object):
26     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28
29     def __init__(self, locator_str):
30         self.hints = []
31         self._perm_sig = None
32         self._perm_expiry = None
33         pieces = iter(locator_str.split('+'))
34         self.md5sum = next(pieces)
35         try:
36             self.size = int(next(pieces))
37         except StopIteration:
38             self.size = None
39         for hint in pieces:
40             if self.HINT_RE.match(hint) is None:
41                 raise ValueError("invalid hint format: {}".format(hint))
42             elif hint.startswith('A'):
43                 self.parse_permission_hint(hint)
44             else:
45                 self.hints.append(hint)
46
47     def __str__(self):
48         return '+'.join(
49             str(s) for s in [self.md5sum, self.size,
50                              self.permission_hint()] + self.hints
51             if s is not None)
52
53     def stripped(self):
54         if self.size is not None:
55             return "%s+%i" % (self.md5sum, self.size)
56         else:
57             return self.md5sum
58
59     def _make_hex_prop(name, length):
60         # Build and return a new property with the given name that
61         # must be a hex string of the given length.
62         data_name = '_{}'.format(name)
63         def getter(self):
64             return getattr(self, data_name)
65         def setter(self, hex_str):
66             if not arvados.util.is_hex(hex_str, length):
67                 raise ValueError("{} is not a {}-digit hex string: {}".
68                                  format(name, length, hex_str))
69             setattr(self, data_name, hex_str)
70         return property(getter, setter)
71
72     md5sum = _make_hex_prop('md5sum', 32)
73     perm_sig = _make_hex_prop('perm_sig', 40)
74
75     @property
76     def perm_expiry(self):
77         return self._perm_expiry
78
79     @perm_expiry.setter
80     def perm_expiry(self, value):
81         if not arvados.util.is_hex(value, 1, 8):
82             raise ValueError(
83                 "permission timestamp must be a hex Unix timestamp: {}".
84                 format(value))
85         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86
87     def permission_hint(self):
88         data = [self.perm_sig, self.perm_expiry]
89         if None in data:
90             return None
91         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92         return "A{}@{:08x}".format(*data)
93
94     def parse_permission_hint(self, s):
95         try:
96             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97         except IndexError:
98             raise ValueError("bad permission hint {}".format(s))
99
100     def permission_expired(self, as_of_dt=None):
101         if self.perm_expiry is None:
102             return False
103         elif as_of_dt is None:
104             as_of_dt = datetime.datetime.now()
105         return self.perm_expiry <= as_of_dt
106
107
108 class Keep(object):
109     """Simple interface to a global KeepClient object.
110
111     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
112     own API client.  The global KeepClient will build an API client from the
113     current Arvados configuration, which may not match the one you built.
114     """
115     _last_key = None
116
117     @classmethod
118     def global_client_object(cls):
119         global global_client_object
120         # Previously, KeepClient would change its behavior at runtime based
121         # on these configuration settings.  We simulate that behavior here
122         # by checking the values and returning a new KeepClient if any of
123         # them have changed.
124         key = (config.get('ARVADOS_API_HOST'),
125                config.get('ARVADOS_API_TOKEN'),
126                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127                config.get('ARVADOS_KEEP_PROXY'),
128                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129                os.environ.get('KEEP_LOCAL_STORE'))
130         if (global_client_object is None) or (cls._last_key != key):
131             global_client_object = KeepClient()
132             cls._last_key = key
133         return global_client_object
134
135     @staticmethod
136     def get(locator, **kwargs):
137         return Keep.global_client_object().get(locator, **kwargs)
138
139     @staticmethod
140     def put(data, **kwargs):
141         return Keep.global_client_object().put(data, **kwargs)
142
143 class KeepBlockCache(object):
144     # Default RAM cache is 256MiB
145     def __init__(self, cache_max=(256 * 1024 * 1024)):
146         self.cache_max = cache_max
147         self._cache = []
148         self._cache_lock = threading.Lock()
149
150     class CacheSlot(object):
151         def __init__(self, locator):
152             self.locator = locator
153             self.ready = threading.Event()
154             self.content = None
155
156         def get(self):
157             self.ready.wait()
158             return self.content
159
160         def set(self, value):
161             self.content = value
162             self.ready.set()
163
164         def size(self):
165             if self.content is None:
166                 return 0
167             else:
168                 return len(self.content)
169
170     def cap_cache(self):
171         '''Cap the cache size to self.cache_max'''
172         with self._cache_lock:
173             # Select all slots except those where ready.is_set() and content is
174             # None (that means there was an error reading the block).
175             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
176             sm = sum([slot.size() for slot in self._cache])
177             while len(self._cache) > 0 and sm > self.cache_max:
178                 for i in xrange(len(self._cache)-1, -1, -1):
179                     if self._cache[i].ready.is_set():
180                         del self._cache[i]
181                         break
182                 sm = sum([slot.size() for slot in self._cache])
183
184     def _get(self, locator):
185         # Test if the locator is already in the cache
186         for i in xrange(0, len(self._cache)):
187             if self._cache[i].locator == locator:
188                 n = self._cache[i]
189                 if i != 0:
190                     # move it to the front
191                     del self._cache[i]
192                     self._cache.insert(0, n)
193                 return n
194         return None
195
196     def get(self, locator):
197         with self._cache_lock:
198             return self._get(locator)
199
200     def reserve_cache(self, locator):
201         '''Reserve a cache slot for the specified locator,
202         or return the existing slot.'''
203         with self._cache_lock:
204             n = self._get(locator)
205             if n:
206                 return n, False
207             else:
208                 # Add a new cache slot for the locator
209                 n = KeepBlockCache.CacheSlot(locator)
210                 self._cache.insert(0, n)
211                 return n, True
212
213 class KeepClient(object):
214
215     # Default Keep server connection timeout:  2 seconds
216     # Default Keep server read timeout:       64 seconds
217     # Default Keep server bandwidth minimum:  32768 bytes per second
218     # Default Keep proxy connection timeout:  20 seconds
219     # Default Keep proxy read timeout:        64 seconds
220     # Default Keep proxy bandwidth minimum:   32768 bytes per second
221     DEFAULT_TIMEOUT = (2, 64, 32768)
222     DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
223
224     class ThreadLimiter(object):
225         """Limit the number of threads writing to Keep at once.
226
227         This ensures that only a number of writer threads that could
228         potentially achieve the desired replication level run at once.
229         Once the desired replication level is achieved, queued threads
230         are instructed not to run.
231
232         Should be used in a "with" block.
233         """
234         def __init__(self, want_copies, max_service_replicas):
235             self._started = 0
236             self._want_copies = want_copies
237             self._done = 0
238             self._response = None
239             self._start_lock = threading.Condition()
240             if (not max_service_replicas) or (max_service_replicas >= want_copies):
241                 max_threads = 1
242             else:
243                 max_threads = math.ceil(float(want_copies) / max_service_replicas)
244             _logger.debug("Limiter max threads is %d", max_threads)
245             self._todo_lock = threading.Semaphore(max_threads)
246             self._done_lock = threading.Lock()
247             self._local = threading.local()
248
249         def __enter__(self):
250             self._start_lock.acquire()
251             if getattr(self._local, 'sequence', None) is not None:
252                 # If the calling thread has used set_sequence(N), then
253                 # we wait here until N other threads have started.
254                 while self._started < self._local.sequence:
255                     self._start_lock.wait()
256             self._todo_lock.acquire()
257             self._started += 1
258             self._start_lock.notifyAll()
259             self._start_lock.release()
260             return self
261
262         def __exit__(self, type, value, traceback):
263             self._todo_lock.release()
264
265         def set_sequence(self, sequence):
266             self._local.sequence = sequence
267
268         def shall_i_proceed(self):
269             """
270             Return true if the current thread should write to Keep.
271             Return false otherwise.
272             """
273             with self._done_lock:
274                 return (self._done < self._want_copies)
275
276         def save_response(self, response_body, replicas_stored):
277             """
278             Records a response body (a locator, possibly signed) returned by
279             the Keep server, and the number of replicas it stored.
280             """
281             with self._done_lock:
282                 self._done += replicas_stored
283                 self._response = response_body
284
285         def response(self):
286             """Return the body from the response to a PUT request."""
287             with self._done_lock:
288                 return self._response
289
290         def done(self):
291             """Return the total number of replicas successfully stored."""
292             with self._done_lock:
293                 return self._done
294
295
296     class KeepService(object):
297         """Make requests to a single Keep service, and track results.
298
299         A KeepService is intended to last long enough to perform one
300         transaction (GET or PUT) against one Keep service. This can
301         involve calling either get() or put() multiple times in order
302         to retry after transient failures. However, calling both get()
303         and put() on a single instance -- or using the same instance
304         to access two different Keep services -- will not produce
305         sensible behavior.
306         """
307
308         HTTP_ERRORS = (
309             socket.error,
310             ssl.SSLError,
311             arvados.errors.HttpError,
312         )
313
314         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
315             self.root = root
316             self._user_agent_pool = user_agent_pool
317             self._result = {'error': None}
318             self._usable = True
319             self._session = None
320             self.get_headers = {'Accept': 'application/octet-stream'}
321             self.get_headers.update(headers)
322             self.put_headers = headers
323
324         def usable(self):
325             """Is it worth attempting a request?"""
326             return self._usable
327
328         def finished(self):
329             """Did the request succeed or encounter permanent failure?"""
330             return self._result['error'] == False or not self._usable
331
332         def last_result(self):
333             return self._result
334
335         def _get_user_agent(self):
336             try:
337                 return self._user_agent_pool.get(False)
338             except Queue.Empty:
339                 return pycurl.Curl()
340
341         def _put_user_agent(self, ua):
342             try:
343                 ua.reset()
344                 self._user_agent_pool.put(ua, False)
345             except:
346                 ua.close()
347
348         @staticmethod
349         def _socket_open(family, socktype, protocol, address=None):
350             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
351             s = socket.socket(family, socktype, protocol)
352             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
353             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
354             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
355             return s
356
357         def get(self, locator, timeout=None):
358             # locator is a KeepLocator object.
359             url = self.root + str(locator)
360             _logger.debug("Request: GET %s", url)
361             curl = self._get_user_agent()
362             try:
363                 with timer.Timer() as t:
364                     self._headers = {}
365                     response_body = cStringIO.StringIO()
366                     curl.setopt(pycurl.NOSIGNAL, 1)
367                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
368                     curl.setopt(pycurl.URL, url.encode('utf-8'))
369                     curl.setopt(pycurl.HTTPHEADER, [
370                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
371                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
372                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
373                     self._setcurltimeouts(curl, timeout)
374                     try:
375                         curl.perform()
376                     except Exception as e:
377                         raise arvados.errors.HttpError(0, str(e))
378                     self._result = {
379                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
380                         'body': response_body.getvalue(),
381                         'headers': self._headers,
382                         'error': False,
383                     }
384                 ok = retry.check_http_response_success(self._result['status_code'])
385                 if not ok:
386                     self._result['error'] = arvados.errors.HttpError(
387                         self._result['status_code'],
388                         self._headers.get('x-status-line', 'Error'))
389             except self.HTTP_ERRORS as e:
390                 self._result = {
391                     'error': e,
392                 }
393                 ok = False
394             self._usable = ok != False
395             if self._result.get('status_code', None):
396                 # The client worked well enough to get an HTTP status
397                 # code, so presumably any problems are just on the
398                 # server side and it's OK to reuse the client.
399                 self._put_user_agent(curl)
400             else:
401                 # Don't return this client to the pool, in case it's
402                 # broken.
403                 curl.close()
404             if not ok:
405                 _logger.debug("Request fail: GET %s => %s: %s",
406                               url, type(self._result['error']), str(self._result['error']))
407                 return None
408             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
409                          self._result['status_code'],
410                          len(self._result['body']),
411                          t.msecs,
412                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
413             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
414             if resp_md5 != locator.md5sum:
415                 _logger.warning("Checksum fail: md5(%s) = %s",
416                                 url, resp_md5)
417                 self._result['error'] = arvados.errors.HttpError(
418                     0, 'Checksum fail')
419                 return None
420             return self._result['body']
421
422         def put(self, hash_s, body, timeout=None):
423             url = self.root + hash_s
424             _logger.debug("Request: PUT %s", url)
425             curl = self._get_user_agent()
426             try:
427                 self._headers = {}
428                 body_reader = cStringIO.StringIO(body)
429                 response_body = cStringIO.StringIO()
430                 curl.setopt(pycurl.NOSIGNAL, 1)
431                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
432                 curl.setopt(pycurl.URL, url.encode('utf-8'))
433                 # Using UPLOAD tells cURL to wait for a "go ahead" from the
434                 # Keep server (in the form of a HTTP/1.1 "100 Continue"
435                 # response) instead of sending the request body immediately.
436                 # This allows the server to reject the request if the request
437                 # is invalid or the server is read-only, without waiting for
438                 # the client to send the entire block.
439                 curl.setopt(pycurl.UPLOAD, True)
440                 curl.setopt(pycurl.INFILESIZE, len(body))
441                 curl.setopt(pycurl.READFUNCTION, body_reader.read)
442                 curl.setopt(pycurl.HTTPHEADER, [
443                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
444                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
445                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
446                 self._setcurltimeouts(curl, timeout)
447                 try:
448                     curl.perform()
449                 except Exception as e:
450                     raise arvados.errors.HttpError(0, str(e))
451                 self._result = {
452                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
453                     'body': response_body.getvalue(),
454                     'headers': self._headers,
455                     'error': False,
456                 }
457                 ok = retry.check_http_response_success(self._result['status_code'])
458                 if not ok:
459                     self._result['error'] = arvados.errors.HttpError(
460                         self._result['status_code'],
461                         self._headers.get('x-status-line', 'Error'))
462             except self.HTTP_ERRORS as e:
463                 self._result = {
464                     'error': e,
465                 }
466                 ok = False
467             self._usable = ok != False # still usable if ok is True or None
468             if self._result.get('status_code', None):
469                 # Client is functional. See comment in get().
470                 self._put_user_agent(curl)
471             else:
472                 curl.close()
473             if not ok:
474                 _logger.debug("Request fail: PUT %s => %s: %s",
475                               url, type(self._result['error']), str(self._result['error']))
476                 return False
477             return True
478
479         def _setcurltimeouts(self, curl, timeouts):
480             if not timeouts:
481                 return
482             elif isinstance(timeouts, tuple):
483                 if len(timeouts) == 2:
484                     conn_t, xfer_t = timeouts
485                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
486                 else:
487                     conn_t, xfer_t, bandwidth_bps = timeouts
488             else:
489                 conn_t, xfer_t = (timeouts, timeouts)
490                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
491             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
492             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
493             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
494
495         def _headerfunction(self, header_line):
496             header_line = header_line.decode('iso-8859-1')
497             if ':' in header_line:
498                 name, value = header_line.split(':', 1)
499                 name = name.strip().lower()
500                 value = value.strip()
501             elif self._headers:
502                 name = self._lastheadername
503                 value = self._headers[name] + ' ' + header_line.strip()
504             elif header_line.startswith('HTTP/'):
505                 name = 'x-status-line'
506                 value = header_line
507             else:
508                 _logger.error("Unexpected header line: %s", header_line)
509                 return
510             self._lastheadername = name
511             self._headers[name] = value
512             # Returning None implies all bytes were written
513
514
515     class KeepWriterThread(threading.Thread):
516         """
517         Write a blob of data to the given Keep server. On success, call
518         save_response() of the given ThreadLimiter to save the returned
519         locator.
520         """
521         def __init__(self, keep_service, **kwargs):
522             super(KeepClient.KeepWriterThread, self).__init__()
523             self.service = keep_service
524             self.args = kwargs
525             self._success = False
526
527         def success(self):
528             return self._success
529
530         def run(self):
531             limiter = self.args['thread_limiter']
532             sequence = self.args['thread_sequence']
533             if sequence is not None:
534                 limiter.set_sequence(sequence)
535             with limiter:
536                 if not limiter.shall_i_proceed():
537                     # My turn arrived, but the job has been done without
538                     # me.
539                     return
540                 self.run_with_limiter(limiter)
541
542         def run_with_limiter(self, limiter):
543             if self.service.finished():
544                 return
545             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
546                           str(threading.current_thread()),
547                           self.args['data_hash'],
548                           len(self.args['data']),
549                           self.args['service_root'])
550             self._success = bool(self.service.put(
551                 self.args['data_hash'],
552                 self.args['data'],
553                 timeout=self.args.get('timeout', None)))
554             result = self.service.last_result()
555             if self._success:
556                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
557                               str(threading.current_thread()),
558                               self.args['data_hash'],
559                               len(self.args['data']),
560                               self.args['service_root'])
561                 # Tick the 'done' counter for the number of replica
562                 # reported stored by the server, for the case that
563                 # we're talking to a proxy or other backend that
564                 # stores to multiple copies for us.
565                 try:
566                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
567                 except (KeyError, ValueError):
568                     replicas_stored = 1
569                 limiter.save_response(result['body'].strip(), replicas_stored)
570             elif result.get('status_code', None):
571                 _logger.debug("Request fail: PUT %s => %s %s",
572                               self.args['data_hash'],
573                               result['status_code'],
574                               result['body'])
575
576
577     def __init__(self, api_client=None, proxy=None,
578                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
579                  api_token=None, local_store=None, block_cache=None,
580                  num_retries=0, session=None):
581         """Initialize a new KeepClient.
582
583         Arguments:
584         :api_client:
585           The API client to use to find Keep services.  If not
586           provided, KeepClient will build one from available Arvados
587           configuration.
588
589         :proxy:
590           If specified, this KeepClient will send requests to this Keep
591           proxy.  Otherwise, KeepClient will fall back to the setting of the
592           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
593           KeepClient does not use a proxy, pass in an empty string.
594
595         :timeout:
596           The initial timeout (in seconds) for HTTP requests to Keep
597           non-proxy servers.  A tuple of three floats is interpreted as
598           (connection_timeout, read_timeout, minimum_bandwidth). A connection
599           will be aborted if the average traffic rate falls below
600           minimum_bandwidth bytes per second over an interval of read_timeout
601           seconds. Because timeouts are often a result of transient server
602           load, the actual connection timeout will be increased by a factor
603           of two on each retry.
604           Default: (2, 64, 32768).
605
606         :proxy_timeout:
607           The initial timeout (in seconds) for HTTP requests to
608           Keep proxies. A tuple of three floats is interpreted as
609           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
610           described above for adjusting connection timeouts on retry also
611           applies.
612           Default: (20, 64, 32768).
613
614         :api_token:
615           If you're not using an API client, but only talking
616           directly to a Keep proxy, this parameter specifies an API token
617           to authenticate Keep requests.  It is an error to specify both
618           api_client and api_token.  If you specify neither, KeepClient
619           will use one available from the Arvados configuration.
620
621         :local_store:
622           If specified, this KeepClient will bypass Keep
623           services, and save data to the named directory.  If unspecified,
624           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
625           environment variable.  If you want to ensure KeepClient does not
626           use local storage, pass in an empty string.  This is primarily
627           intended to mock a server for testing.
628
629         :num_retries:
630           The default number of times to retry failed requests.
631           This will be used as the default num_retries value when get() and
632           put() are called.  Default 0.
633         """
634         self.lock = threading.Lock()
635         if proxy is None:
636             proxy = config.get('ARVADOS_KEEP_PROXY')
637         if api_token is None:
638             if api_client is None:
639                 api_token = config.get('ARVADOS_API_TOKEN')
640             else:
641                 api_token = api_client.api_token
642         elif api_client is not None:
643             raise ValueError(
644                 "can't build KeepClient with both API client and token")
645         if local_store is None:
646             local_store = os.environ.get('KEEP_LOCAL_STORE')
647
648         self.block_cache = block_cache if block_cache else KeepBlockCache()
649         self.timeout = timeout
650         self.proxy_timeout = proxy_timeout
651         self._user_agent_pool = Queue.LifoQueue()
652
653         if local_store:
654             self.local_store = local_store
655             self.get = self.local_store_get
656             self.put = self.local_store_put
657         else:
658             self.num_retries = num_retries
659             self.max_replicas_per_service = None
660             if proxy:
661                 if not proxy.endswith('/'):
662                     proxy += '/'
663                 self.api_token = api_token
664                 self._gateway_services = {}
665                 self._keep_services = [{
666                     'uuid': 'proxy',
667                     'service_type': 'proxy',
668                     '_service_root': proxy,
669                     }]
670                 self._writable_services = self._keep_services
671                 self.using_proxy = True
672                 self._static_services_list = True
673             else:
674                 # It's important to avoid instantiating an API client
675                 # unless we actually need one, for testing's sake.
676                 if api_client is None:
677                     api_client = arvados.api('v1')
678                 self.api_client = api_client
679                 self.api_token = api_client.api_token
680                 self._gateway_services = {}
681                 self._keep_services = None
682                 self._writable_services = None
683                 self.using_proxy = None
684                 self._static_services_list = False
685
686     def current_timeout(self, attempt_number):
687         """Return the appropriate timeout to use for this client.
688
689         The proxy timeout setting if the backend service is currently a proxy,
690         the regular timeout setting otherwise.  The `attempt_number` indicates
691         how many times the operation has been tried already (starting from 0
692         for the first try), and scales the connection timeout portion of the
693         return value accordingly.
694
695         """
696         # TODO(twp): the timeout should be a property of a
697         # KeepService, not a KeepClient. See #4488.
698         t = self.proxy_timeout if self.using_proxy else self.timeout
699         if len(t) == 2:
700             return (t[0] * (1 << attempt_number), t[1])
701         else:
702             return (t[0] * (1 << attempt_number), t[1], t[2])
703     def _any_nondisk_services(self, service_list):
704         return any(ks.get('service_type', 'disk') != 'disk'
705                    for ks in service_list)
706
707     def build_services_list(self, force_rebuild=False):
708         if (self._static_services_list or
709               (self._keep_services and not force_rebuild)):
710             return
711         with self.lock:
712             try:
713                 keep_services = self.api_client.keep_services().accessible()
714             except Exception:  # API server predates Keep services.
715                 keep_services = self.api_client.keep_disks().list()
716
717             # Gateway services are only used when specified by UUID,
718             # so there's nothing to gain by filtering them by
719             # service_type.
720             self._gateway_services = {ks['uuid']: ks for ks in
721                                       keep_services.execute()['items']}
722             if not self._gateway_services:
723                 raise arvados.errors.NoKeepServersError()
724
725             # Precompute the base URI for each service.
726             for r in self._gateway_services.itervalues():
727                 host = r['service_host']
728                 if not host.startswith('[') and host.find(':') >= 0:
729                     # IPv6 URIs must be formatted like http://[::1]:80/...
730                     host = '[' + host + ']'
731                 r['_service_root'] = "{}://{}:{:d}/".format(
732                     'https' if r['service_ssl_flag'] else 'http',
733                     host,
734                     r['service_port'])
735
736             _logger.debug(str(self._gateway_services))
737             self._keep_services = [
738                 ks for ks in self._gateway_services.itervalues()
739                 if not ks.get('service_type', '').startswith('gateway:')]
740             self._writable_services = [ks for ks in self._keep_services
741                                        if not ks.get('read_only')]
742
743             # For disk type services, max_replicas_per_service is 1
744             # It is unknown (unlimited) for other service types.
745             if self._any_nondisk_services(self._writable_services):
746                 self.max_replicas_per_service = None
747             else:
748                 self.max_replicas_per_service = 1
749
750     def _service_weight(self, data_hash, service_uuid):
751         """Compute the weight of a Keep service endpoint for a data
752         block with a known hash.
753
754         The weight is md5(h + u) where u is the last 15 characters of
755         the service endpoint's UUID.
756         """
757         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
758
759     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
760         """Return an array of Keep service endpoints, in the order in
761         which they should be probed when reading or writing data with
762         the given hash+hints.
763         """
764         self.build_services_list(force_rebuild)
765
766         sorted_roots = []
767         # Use the services indicated by the given +K@... remote
768         # service hints, if any are present and can be resolved to a
769         # URI.
770         for hint in locator.hints:
771             if hint.startswith('K@'):
772                 if len(hint) == 7:
773                     sorted_roots.append(
774                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
775                 elif len(hint) == 29:
776                     svc = self._gateway_services.get(hint[2:])
777                     if svc:
778                         sorted_roots.append(svc['_service_root'])
779
780         # Sort the available local services by weight (heaviest first)
781         # for this locator, and return their service_roots (base URIs)
782         # in that order.
783         use_services = self._keep_services
784         if need_writable:
785             use_services = self._writable_services
786         self.using_proxy = self._any_nondisk_services(use_services)
787         sorted_roots.extend([
788             svc['_service_root'] for svc in sorted(
789                 use_services,
790                 reverse=True,
791                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
792         _logger.debug("{}: {}".format(locator, sorted_roots))
793         return sorted_roots
794
795     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
796         # roots_map is a dictionary, mapping Keep service root strings
797         # to KeepService objects.  Poll for Keep services, and add any
798         # new ones to roots_map.  Return the current list of local
799         # root strings.
800         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
801         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
802         for root in local_roots:
803             if root not in roots_map:
804                 roots_map[root] = self.KeepService(
805                     root, self._user_agent_pool, **headers)
806         return local_roots
807
808     @staticmethod
809     def _check_loop_result(result):
810         # KeepClient RetryLoops should save results as a 2-tuple: the
811         # actual result of the request, and the number of servers available
812         # to receive the request this round.
813         # This method returns True if there's a real result, False if
814         # there are no more servers available, otherwise None.
815         if isinstance(result, Exception):
816             return None
817         result, tried_server_count = result
818         if (result is not None) and (result is not False):
819             return True
820         elif tried_server_count < 1:
821             _logger.info("No more Keep services to try; giving up")
822             return False
823         else:
824             return None
825
826     def get_from_cache(self, loc):
827         """Fetch a block only if is in the cache, otherwise return None."""
828         slot = self.block_cache.get(loc)
829         if slot is not None and slot.ready.is_set():
830             return slot.get()
831         else:
832             return None
833
834     @retry.retry_method
835     def get(self, loc_s, num_retries=None):
836         """Get data from Keep.
837
838         This method fetches one or more blocks of data from Keep.  It
839         sends a request each Keep service registered with the API
840         server (or the proxy provided when this client was
841         instantiated), then each service named in location hints, in
842         sequence.  As soon as one service provides the data, it's
843         returned.
844
845         Arguments:
846         * loc_s: A string of one or more comma-separated locators to fetch.
847           This method returns the concatenation of these blocks.
848         * num_retries: The number of times to retry GET requests to
849           *each* Keep server if it returns temporary failures, with
850           exponential backoff.  Note that, in each loop, the method may try
851           to fetch data from every available Keep service, along with any
852           that are named in location hints in the locator.  The default value
853           is set when the KeepClient is initialized.
854         """
855         if ',' in loc_s:
856             return ''.join(self.get(x) for x in loc_s.split(','))
857         locator = KeepLocator(loc_s)
858         slot, first = self.block_cache.reserve_cache(locator.md5sum)
859         if not first:
860             v = slot.get()
861             return v
862
863         # If the locator has hints specifying a prefix (indicating a
864         # remote keepproxy) or the UUID of a local gateway service,
865         # read data from the indicated service(s) instead of the usual
866         # list of local disk services.
867         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
868                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
869         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
870                            for hint in locator.hints if (
871                                    hint.startswith('K@') and
872                                    len(hint) == 29 and
873                                    self._gateway_services.get(hint[2:])
874                                    )])
875         # Map root URLs to their KeepService objects.
876         roots_map = {
877             root: self.KeepService(root, self._user_agent_pool)
878             for root in hint_roots
879         }
880
881         # See #3147 for a discussion of the loop implementation.  Highlights:
882         # * Refresh the list of Keep services after each failure, in case
883         #   it's being updated.
884         # * Retry until we succeed, we're out of retries, or every available
885         #   service has returned permanent failure.
886         sorted_roots = []
887         roots_map = {}
888         blob = None
889         loop = retry.RetryLoop(num_retries, self._check_loop_result,
890                                backoff_start=2)
891         for tries_left in loop:
892             try:
893                 sorted_roots = self.map_new_services(
894                     roots_map, locator,
895                     force_rebuild=(tries_left < num_retries),
896                     need_writable=False)
897             except Exception as error:
898                 loop.save_result(error)
899                 continue
900
901             # Query KeepService objects that haven't returned
902             # permanent failure, in our specified shuffle order.
903             services_to_try = [roots_map[root]
904                                for root in sorted_roots
905                                if roots_map[root].usable()]
906             for keep_service in services_to_try:
907                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
908                 if blob is not None:
909                     break
910             loop.save_result((blob, len(services_to_try)))
911
912         # Always cache the result, then return it if we succeeded.
913         slot.set(blob)
914         self.block_cache.cap_cache()
915         if loop.success():
916             return blob
917
918         # Q: Including 403 is necessary for the Keep tests to continue
919         # passing, but maybe they should expect KeepReadError instead?
920         not_founds = sum(1 for key in sorted_roots
921                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
922         service_errors = ((key, roots_map[key].last_result()['error'])
923                           for key in sorted_roots)
924         if not roots_map:
925             raise arvados.errors.KeepReadError(
926                 "failed to read {}: no Keep services available ({})".format(
927                     loc_s, loop.last_result()))
928         elif not_founds == len(sorted_roots):
929             raise arvados.errors.NotFoundError(
930                 "{} not found".format(loc_s), service_errors)
931         else:
932             raise arvados.errors.KeepReadError(
933                 "failed to read {}".format(loc_s), service_errors, label="service")
934
935     @retry.retry_method
936     def put(self, data, copies=2, num_retries=None):
937         """Save data in Keep.
938
939         This method will get a list of Keep services from the API server, and
940         send the data to each one simultaneously in a new thread.  Once the
941         uploads are finished, if enough copies are saved, this method returns
942         the most recent HTTP response body.  If requests fail to upload
943         enough copies, this method raises KeepWriteError.
944
945         Arguments:
946         * data: The string of data to upload.
947         * copies: The number of copies that the user requires be saved.
948           Default 2.
949         * num_retries: The number of times to retry PUT requests to
950           *each* Keep server if it returns temporary failures, with
951           exponential backoff.  The default value is set when the
952           KeepClient is initialized.
953         """
954
955         if isinstance(data, unicode):
956             data = data.encode("ascii")
957         elif not isinstance(data, str):
958             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
959
960         data_hash = hashlib.md5(data).hexdigest()
961         loc_s = data_hash + '+' + str(len(data))
962         if copies < 1:
963             return loc_s
964         locator = KeepLocator(loc_s)
965
966         headers = {}
967         # Tell the proxy how many copies we want it to store
968         headers['X-Keep-Desired-Replication'] = str(copies)
969         roots_map = {}
970         loop = retry.RetryLoop(num_retries, self._check_loop_result,
971                                backoff_start=2)
972         for tries_left in loop:
973             try:
974                 sorted_roots = self.map_new_services(
975                     roots_map, locator,
976                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
977             except Exception as error:
978                 loop.save_result(error)
979                 continue
980
981             thread_limiter = KeepClient.ThreadLimiter(
982                 copies, self.max_replicas_per_service)
983             threads = []
984             for service_root, ks in [(root, roots_map[root])
985                                      for root in sorted_roots]:
986                 if ks.finished():
987                     continue
988                 t = KeepClient.KeepWriterThread(
989                     ks,
990                     data=data,
991                     data_hash=data_hash,
992                     service_root=service_root,
993                     thread_limiter=thread_limiter,
994                     timeout=self.current_timeout(num_retries-tries_left),
995                     thread_sequence=len(threads))
996                 t.start()
997                 threads.append(t)
998             for t in threads:
999                 t.join()
1000             loop.save_result((thread_limiter.done() >= copies, len(threads)))
1001
1002         if loop.success():
1003             return thread_limiter.response()
1004         if not roots_map:
1005             raise arvados.errors.KeepWriteError(
1006                 "failed to write {}: no Keep services available ({})".format(
1007                     data_hash, loop.last_result()))
1008         else:
1009             service_errors = ((key, roots_map[key].last_result()['error'])
1010                               for key in sorted_roots
1011                               if roots_map[key].last_result()['error'])
1012             raise arvados.errors.KeepWriteError(
1013                 "failed to write {} (wanted {} copies but wrote {})".format(
1014                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1015
1016     def local_store_put(self, data, copies=1, num_retries=None):
1017         """A stub for put().
1018
1019         This method is used in place of the real put() method when
1020         using local storage (see constructor's local_store argument).
1021
1022         copies and num_retries arguments are ignored: they are here
1023         only for the sake of offering the same call signature as
1024         put().
1025
1026         Data stored this way can be retrieved via local_store_get().
1027         """
1028         md5 = hashlib.md5(data).hexdigest()
1029         locator = '%s+%d' % (md5, len(data))
1030         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1031             f.write(data)
1032         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1033                   os.path.join(self.local_store, md5))
1034         return locator
1035
1036     def local_store_get(self, loc_s, num_retries=None):
1037         """Companion to local_store_put()."""
1038         try:
1039             locator = KeepLocator(loc_s)
1040         except ValueError:
1041             raise arvados.errors.NotFoundError(
1042                 "Invalid data locator: '%s'" % loc_s)
1043         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1044             return ''
1045         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1046             return f.read()
1047
1048     def is_cached(self, locator):
1049         return self.block_cache.reserve_cache(expect_hash)