8784: Fix service discovery race in tests (send SIGHUP to keep-web).
[arvados.git] / sdk / python / arvados / keep.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 from future.utils import native_str
5 standard_library.install_aliases()
6 from builtins import next
7 from builtins import str
8 from builtins import range
9 from builtins import object
10 import collections
11 import datetime
12 import hashlib
13 import io
14 import logging
15 import math
16 import os
17 import pycurl
18 import queue
19 import re
20 import socket
21 import ssl
22 import sys
23 import threading
24 from . import timer
25 import urllib.parse
26
27 if sys.version_info >= (3, 0):
28     from io import BytesIO
29 else:
30     from cStringIO import StringIO as BytesIO
31
32 import arvados
33 import arvados.config as config
34 import arvados.errors
35 import arvados.retry as retry
36 import arvados.util
37
38 _logger = logging.getLogger('arvados.keep')
39 global_client_object = None
40
41
42 # Monkey patch TCP constants when not available (apple). Values sourced from:
43 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
44 if sys.platform == 'darwin':
45     if not hasattr(socket, 'TCP_KEEPALIVE'):
46         socket.TCP_KEEPALIVE = 0x010
47     if not hasattr(socket, 'TCP_KEEPINTVL'):
48         socket.TCP_KEEPINTVL = 0x101
49     if not hasattr(socket, 'TCP_KEEPCNT'):
50         socket.TCP_KEEPCNT = 0x102
51
52
53 class KeepLocator(object):
54     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
55     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
56
57     def __init__(self, locator_str):
58         self.hints = []
59         self._perm_sig = None
60         self._perm_expiry = None
61         pieces = iter(locator_str.split('+'))
62         self.md5sum = next(pieces)
63         try:
64             self.size = int(next(pieces))
65         except StopIteration:
66             self.size = None
67         for hint in pieces:
68             if self.HINT_RE.match(hint) is None:
69                 raise ValueError("invalid hint format: {}".format(hint))
70             elif hint.startswith('A'):
71                 self.parse_permission_hint(hint)
72             else:
73                 self.hints.append(hint)
74
75     def __str__(self):
76         return '+'.join(
77             native_str(s)
78             for s in [self.md5sum, self.size,
79                       self.permission_hint()] + self.hints
80             if s is not None)
81
82     def stripped(self):
83         if self.size is not None:
84             return "%s+%i" % (self.md5sum, self.size)
85         else:
86             return self.md5sum
87
88     def _make_hex_prop(name, length):
89         # Build and return a new property with the given name that
90         # must be a hex string of the given length.
91         data_name = '_{}'.format(name)
92         def getter(self):
93             return getattr(self, data_name)
94         def setter(self, hex_str):
95             if not arvados.util.is_hex(hex_str, length):
96                 raise ValueError("{} is not a {}-digit hex string: {!r}".
97                                  format(name, length, hex_str))
98             setattr(self, data_name, hex_str)
99         return property(getter, setter)
100
101     md5sum = _make_hex_prop('md5sum', 32)
102     perm_sig = _make_hex_prop('perm_sig', 40)
103
104     @property
105     def perm_expiry(self):
106         return self._perm_expiry
107
108     @perm_expiry.setter
109     def perm_expiry(self, value):
110         if not arvados.util.is_hex(value, 1, 8):
111             raise ValueError(
112                 "permission timestamp must be a hex Unix timestamp: {}".
113                 format(value))
114         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
115
116     def permission_hint(self):
117         data = [self.perm_sig, self.perm_expiry]
118         if None in data:
119             return None
120         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
121         return "A{}@{:08x}".format(*data)
122
123     def parse_permission_hint(self, s):
124         try:
125             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
126         except IndexError:
127             raise ValueError("bad permission hint {}".format(s))
128
129     def permission_expired(self, as_of_dt=None):
130         if self.perm_expiry is None:
131             return False
132         elif as_of_dt is None:
133             as_of_dt = datetime.datetime.now()
134         return self.perm_expiry <= as_of_dt
135
136
137 class Keep(object):
138     """Simple interface to a global KeepClient object.
139
140     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
141     own API client.  The global KeepClient will build an API client from the
142     current Arvados configuration, which may not match the one you built.
143     """
144     _last_key = None
145
146     @classmethod
147     def global_client_object(cls):
148         global global_client_object
149         # Previously, KeepClient would change its behavior at runtime based
150         # on these configuration settings.  We simulate that behavior here
151         # by checking the values and returning a new KeepClient if any of
152         # them have changed.
153         key = (config.get('ARVADOS_API_HOST'),
154                config.get('ARVADOS_API_TOKEN'),
155                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
156                config.get('ARVADOS_KEEP_PROXY'),
157                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
158                os.environ.get('KEEP_LOCAL_STORE'))
159         if (global_client_object is None) or (cls._last_key != key):
160             global_client_object = KeepClient()
161             cls._last_key = key
162         return global_client_object
163
164     @staticmethod
165     def get(locator, **kwargs):
166         return Keep.global_client_object().get(locator, **kwargs)
167
168     @staticmethod
169     def put(data, **kwargs):
170         return Keep.global_client_object().put(data, **kwargs)
171
172 class KeepBlockCache(object):
173     # Default RAM cache is 256MiB
174     def __init__(self, cache_max=(256 * 1024 * 1024)):
175         self.cache_max = cache_max
176         self._cache = []
177         self._cache_lock = threading.Lock()
178
179     class CacheSlot(object):
180         __slots__ = ("locator", "ready", "content")
181
182         def __init__(self, locator):
183             self.locator = locator
184             self.ready = threading.Event()
185             self.content = None
186
187         def get(self):
188             self.ready.wait()
189             return self.content
190
191         def set(self, value):
192             self.content = value
193             self.ready.set()
194
195         def size(self):
196             if self.content is None:
197                 return 0
198             else:
199                 return len(self.content)
200
201     def cap_cache(self):
202         '''Cap the cache size to self.cache_max'''
203         with self._cache_lock:
204             # Select all slots except those where ready.is_set() and content is
205             # None (that means there was an error reading the block).
206             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
207             sm = sum([slot.size() for slot in self._cache])
208             while len(self._cache) > 0 and sm > self.cache_max:
209                 for i in range(len(self._cache)-1, -1, -1):
210                     if self._cache[i].ready.is_set():
211                         del self._cache[i]
212                         break
213                 sm = sum([slot.size() for slot in self._cache])
214
215     def _get(self, locator):
216         # Test if the locator is already in the cache
217         for i in range(0, len(self._cache)):
218             if self._cache[i].locator == locator:
219                 n = self._cache[i]
220                 if i != 0:
221                     # move it to the front
222                     del self._cache[i]
223                     self._cache.insert(0, n)
224                 return n
225         return None
226
227     def get(self, locator):
228         with self._cache_lock:
229             return self._get(locator)
230
231     def reserve_cache(self, locator):
232         '''Reserve a cache slot for the specified locator,
233         or return the existing slot.'''
234         with self._cache_lock:
235             n = self._get(locator)
236             if n:
237                 return n, False
238             else:
239                 # Add a new cache slot for the locator
240                 n = KeepBlockCache.CacheSlot(locator)
241                 self._cache.insert(0, n)
242                 return n, True
243
244 class Counter(object):
245     def __init__(self, v=0):
246         self._lk = threading.Lock()
247         self._val = v
248
249     def add(self, v):
250         with self._lk:
251             self._val += v
252
253     def get(self):
254         with self._lk:
255             return self._val
256
257
258 class KeepClient(object):
259
260     # Default Keep server connection timeout:  2 seconds
261     # Default Keep server read timeout:       256 seconds
262     # Default Keep server bandwidth minimum:  32768 bytes per second
263     # Default Keep proxy connection timeout:  20 seconds
264     # Default Keep proxy read timeout:        256 seconds
265     # Default Keep proxy bandwidth minimum:   32768 bytes per second
266     DEFAULT_TIMEOUT = (2, 256, 32768)
267     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
268
269
270     class KeepService(object):
271         """Make requests to a single Keep service, and track results.
272
273         A KeepService is intended to last long enough to perform one
274         transaction (GET or PUT) against one Keep service. This can
275         involve calling either get() or put() multiple times in order
276         to retry after transient failures. However, calling both get()
277         and put() on a single instance -- or using the same instance
278         to access two different Keep services -- will not produce
279         sensible behavior.
280         """
281
282         HTTP_ERRORS = (
283             socket.error,
284             ssl.SSLError,
285             arvados.errors.HttpError,
286         )
287
288         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
289                      upload_counter=None,
290                      download_counter=None, **headers):
291             self.root = root
292             self._user_agent_pool = user_agent_pool
293             self._result = {'error': None}
294             self._usable = True
295             self._session = None
296             self._socket = None
297             self.get_headers = {'Accept': 'application/octet-stream'}
298             self.get_headers.update(headers)
299             self.put_headers = headers
300             self.upload_counter = upload_counter
301             self.download_counter = download_counter
302
303         def usable(self):
304             """Is it worth attempting a request?"""
305             return self._usable
306
307         def finished(self):
308             """Did the request succeed or encounter permanent failure?"""
309             return self._result['error'] == False or not self._usable
310
311         def last_result(self):
312             return self._result
313
314         def _get_user_agent(self):
315             try:
316                 return self._user_agent_pool.get(block=False)
317             except queue.Empty:
318                 return pycurl.Curl()
319
320         def _put_user_agent(self, ua):
321             try:
322                 ua.reset()
323                 self._user_agent_pool.put(ua, block=False)
324             except:
325                 ua.close()
326
327         def _socket_open(self, *args, **kwargs):
328             if len(args) + len(kwargs) == 2:
329                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
330             else:
331                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
332
333         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
334             return self._socket_open_pycurl_7_21_5(
335                 purpose=None,
336                 address=collections.namedtuple(
337                     'Address', ['family', 'socktype', 'protocol', 'addr'],
338                 )(family, socktype, protocol, address))
339
340         def _socket_open_pycurl_7_21_5(self, purpose, address):
341             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
342             s = socket.socket(address.family, address.socktype, address.protocol)
343             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
344             # Will throw invalid protocol error on mac. This test prevents that.
345             if hasattr(socket, 'TCP_KEEPIDLE'):
346                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
347             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
348             self._socket = s
349             return s
350
351         def get(self, locator, method="GET", timeout=None):
352             # locator is a KeepLocator object.
353             url = self.root + str(locator)
354             _logger.debug("Request: %s %s", method, url)
355             curl = self._get_user_agent()
356             ok = None
357             try:
358                 with timer.Timer() as t:
359                     self._headers = {}
360                     response_body = BytesIO()
361                     curl.setopt(pycurl.NOSIGNAL, 1)
362                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
363                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
364                     curl.setopt(pycurl.URL, url.encode('utf-8'))
365                     curl.setopt(pycurl.HTTPHEADER, [
366                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
367                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
368                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
369                     if method == "HEAD":
370                         curl.setopt(pycurl.NOBODY, True)
371                     self._setcurltimeouts(curl, timeout)
372
373                     try:
374                         curl.perform()
375                     except Exception as e:
376                         raise arvados.errors.HttpError(0, str(e))
377                     finally:
378                         if self._socket:
379                             self._socket.close()
380                             self._socket = None
381                     self._result = {
382                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
383                         'body': response_body.getvalue(),
384                         'headers': self._headers,
385                         'error': False,
386                     }
387
388                 ok = retry.check_http_response_success(self._result['status_code'])
389                 if not ok:
390                     self._result['error'] = arvados.errors.HttpError(
391                         self._result['status_code'],
392                         self._headers.get('x-status-line', 'Error'))
393             except self.HTTP_ERRORS as e:
394                 self._result = {
395                     'error': e,
396                 }
397             self._usable = ok != False
398             if self._result.get('status_code', None):
399                 # The client worked well enough to get an HTTP status
400                 # code, so presumably any problems are just on the
401                 # server side and it's OK to reuse the client.
402                 self._put_user_agent(curl)
403             else:
404                 # Don't return this client to the pool, in case it's
405                 # broken.
406                 curl.close()
407             if not ok:
408                 _logger.debug("Request fail: GET %s => %s: %s",
409                               url, type(self._result['error']), str(self._result['error']))
410                 return None
411             if method == "HEAD":
412                 _logger.info("HEAD %s: %s bytes",
413                          self._result['status_code'],
414                          self._result.get('content-length'))
415                 return True
416
417             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
418                          self._result['status_code'],
419                          len(self._result['body']),
420                          t.msecs,
421                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
422
423             if self.download_counter:
424                 self.download_counter.add(len(self._result['body']))
425             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
426             if resp_md5 != locator.md5sum:
427                 _logger.warning("Checksum fail: md5(%s) = %s",
428                                 url, resp_md5)
429                 self._result['error'] = arvados.errors.HttpError(
430                     0, 'Checksum fail')
431                 return None
432             return self._result['body']
433
434         def put(self, hash_s, body, timeout=None):
435             url = self.root + hash_s
436             _logger.debug("Request: PUT %s", url)
437             curl = self._get_user_agent()
438             ok = None
439             try:
440                 with timer.Timer() as t:
441                     self._headers = {}
442                     body_reader = BytesIO(body)
443                     response_body = BytesIO()
444                     curl.setopt(pycurl.NOSIGNAL, 1)
445                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
446                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
447                     curl.setopt(pycurl.URL, url.encode('utf-8'))
448                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
449                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
450                     # response) instead of sending the request body immediately.
451                     # This allows the server to reject the request if the request
452                     # is invalid or the server is read-only, without waiting for
453                     # the client to send the entire block.
454                     curl.setopt(pycurl.UPLOAD, True)
455                     curl.setopt(pycurl.INFILESIZE, len(body))
456                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
457                     curl.setopt(pycurl.HTTPHEADER, [
458                         '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
459                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
460                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
461                     self._setcurltimeouts(curl, timeout)
462                     try:
463                         curl.perform()
464                     except Exception as e:
465                         raise arvados.errors.HttpError(0, str(e))
466                     finally:
467                         if self._socket:
468                             self._socket.close()
469                             self._socket = None
470                     self._result = {
471                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
472                         'body': response_body.getvalue().decode('utf-8'),
473                         'headers': self._headers,
474                         'error': False,
475                     }
476                 ok = retry.check_http_response_success(self._result['status_code'])
477                 if not ok:
478                     self._result['error'] = arvados.errors.HttpError(
479                         self._result['status_code'],
480                         self._headers.get('x-status-line', 'Error'))
481             except self.HTTP_ERRORS as e:
482                 self._result = {
483                     'error': e,
484                 }
485             self._usable = ok != False # still usable if ok is True or None
486             if self._result.get('status_code', None):
487                 # Client is functional. See comment in get().
488                 self._put_user_agent(curl)
489             else:
490                 curl.close()
491             if not ok:
492                 _logger.debug("Request fail: PUT %s => %s: %s",
493                               url, type(self._result['error']), str(self._result['error']))
494                 return False
495             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
496                          self._result['status_code'],
497                          len(body),
498                          t.msecs,
499                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
500             if self.upload_counter:
501                 self.upload_counter.add(len(body))
502             return True
503
504         def _setcurltimeouts(self, curl, timeouts):
505             if not timeouts:
506                 return
507             elif isinstance(timeouts, tuple):
508                 if len(timeouts) == 2:
509                     conn_t, xfer_t = timeouts
510                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
511                 else:
512                     conn_t, xfer_t, bandwidth_bps = timeouts
513             else:
514                 conn_t, xfer_t = (timeouts, timeouts)
515                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
516             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
517             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
518             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
519
520         def _headerfunction(self, header_line):
521             if isinstance(header_line, bytes):
522                 header_line = header_line.decode('iso-8859-1')
523             if ':' in header_line:
524                 name, value = header_line.split(':', 1)
525                 name = name.strip().lower()
526                 value = value.strip()
527             elif self._headers:
528                 name = self._lastheadername
529                 value = self._headers[name] + ' ' + header_line.strip()
530             elif header_line.startswith('HTTP/'):
531                 name = 'x-status-line'
532                 value = header_line
533             else:
534                 _logger.error("Unexpected header line: %s", header_line)
535                 return
536             self._lastheadername = name
537             self._headers[name] = value
538             # Returning None implies all bytes were written
539     
540
541     class KeepWriterQueue(queue.Queue):
542         def __init__(self, copies):
543             queue.Queue.__init__(self) # Old-style superclass
544             self.wanted_copies = copies
545             self.successful_copies = 0
546             self.response = None
547             self.successful_copies_lock = threading.Lock()
548             self.pending_tries = copies
549             self.pending_tries_notification = threading.Condition()
550         
551         def write_success(self, response, replicas_nr):
552             with self.successful_copies_lock:
553                 self.successful_copies += replicas_nr
554                 self.response = response
555             with self.pending_tries_notification:
556                 self.pending_tries_notification.notify_all()
557         
558         def write_fail(self, ks):
559             with self.pending_tries_notification:
560                 self.pending_tries += 1
561                 self.pending_tries_notification.notify()
562         
563         def pending_copies(self):
564             with self.successful_copies_lock:
565                 return self.wanted_copies - self.successful_copies
566
567         def get_next_task(self):
568             with self.pending_tries_notification:
569                 while True:
570                     if self.pending_copies() < 1:
571                         # This notify_all() is unnecessary --
572                         # write_success() already called notify_all()
573                         # when pending<1 became true, so it's not
574                         # possible for any other thread to be in
575                         # wait() now -- but it's cheap insurance
576                         # against deadlock so we do it anyway:
577                         self.pending_tries_notification.notify_all()
578                         # Drain the queue and then raise Queue.Empty
579                         while True:
580                             self.get_nowait()
581                             self.task_done()
582                     elif self.pending_tries > 0:
583                         service, service_root = self.get_nowait()
584                         if service.finished():
585                             self.task_done()
586                             continue
587                         self.pending_tries -= 1
588                         return service, service_root
589                     elif self.empty():
590                         self.pending_tries_notification.notify_all()
591                         raise queue.Empty
592                     else:
593                         self.pending_tries_notification.wait()
594
595
596     class KeepWriterThreadPool(object):
597         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
598             self.total_task_nr = 0
599             self.wanted_copies = copies
600             if (not max_service_replicas) or (max_service_replicas >= copies):
601                 num_threads = 1
602             else:
603                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
604             _logger.debug("Pool max threads is %d", num_threads)
605             self.workers = []
606             self.queue = KeepClient.KeepWriterQueue(copies)
607             # Create workers
608             for _ in range(num_threads):
609                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
610                 self.workers.append(w)
611         
612         def add_task(self, ks, service_root):
613             self.queue.put((ks, service_root))
614             self.total_task_nr += 1
615         
616         def done(self):
617             return self.queue.successful_copies
618         
619         def join(self):
620             # Start workers
621             for worker in self.workers:
622                 worker.start()
623             # Wait for finished work
624             self.queue.join()
625         
626         def response(self):
627             return self.queue.response
628     
629     
630     class KeepWriterThread(threading.Thread):
631         TaskFailed = RuntimeError()
632
633         def __init__(self, queue, data, data_hash, timeout=None):
634             super(KeepClient.KeepWriterThread, self).__init__()
635             self.timeout = timeout
636             self.queue = queue
637             self.data = data
638             self.data_hash = data_hash
639             self.daemon = True
640
641         def run(self):
642             while True:
643                 try:
644                     service, service_root = self.queue.get_next_task()
645                 except queue.Empty:
646                     return
647                 try:
648                     locator, copies = self.do_task(service, service_root)
649                 except Exception as e:
650                     if e is not self.TaskFailed:
651                         _logger.exception("Exception in KeepWriterThread")
652                     self.queue.write_fail(service)
653                 else:
654                     self.queue.write_success(locator, copies)
655                 finally:
656                     self.queue.task_done()
657
658         def do_task(self, service, service_root):
659             success = bool(service.put(self.data_hash,
660                                         self.data,
661                                         timeout=self.timeout))
662             result = service.last_result()
663
664             if not success:
665                 if result.get('status_code', None):
666                     _logger.debug("Request fail: PUT %s => %s %s",
667                                   self.data_hash,
668                                   result['status_code'],
669                                   result['body'])
670                 raise self.TaskFailed
671
672             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
673                           str(threading.current_thread()),
674                           self.data_hash,
675                           len(self.data),
676                           service_root)
677             try:
678                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
679             except (KeyError, ValueError):
680                 replicas_stored = 1
681
682             return result['body'].strip(), replicas_stored
683
684
685     def __init__(self, api_client=None, proxy=None,
686                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
687                  api_token=None, local_store=None, block_cache=None,
688                  num_retries=0, session=None):
689         """Initialize a new KeepClient.
690
691         Arguments:
692         :api_client:
693           The API client to use to find Keep services.  If not
694           provided, KeepClient will build one from available Arvados
695           configuration.
696
697         :proxy:
698           If specified, this KeepClient will send requests to this Keep
699           proxy.  Otherwise, KeepClient will fall back to the setting of the
700           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
701           If you want to KeepClient does not use a proxy, pass in an empty
702           string.
703
704         :timeout:
705           The initial timeout (in seconds) for HTTP requests to Keep
706           non-proxy servers.  A tuple of three floats is interpreted as
707           (connection_timeout, read_timeout, minimum_bandwidth). A connection
708           will be aborted if the average traffic rate falls below
709           minimum_bandwidth bytes per second over an interval of read_timeout
710           seconds. Because timeouts are often a result of transient server
711           load, the actual connection timeout will be increased by a factor
712           of two on each retry.
713           Default: (2, 256, 32768).
714
715         :proxy_timeout:
716           The initial timeout (in seconds) for HTTP requests to
717           Keep proxies. A tuple of three floats is interpreted as
718           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
719           described above for adjusting connection timeouts on retry also
720           applies.
721           Default: (20, 256, 32768).
722
723         :api_token:
724           If you're not using an API client, but only talking
725           directly to a Keep proxy, this parameter specifies an API token
726           to authenticate Keep requests.  It is an error to specify both
727           api_client and api_token.  If you specify neither, KeepClient
728           will use one available from the Arvados configuration.
729
730         :local_store:
731           If specified, this KeepClient will bypass Keep
732           services, and save data to the named directory.  If unspecified,
733           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
734           environment variable.  If you want to ensure KeepClient does not
735           use local storage, pass in an empty string.  This is primarily
736           intended to mock a server for testing.
737
738         :num_retries:
739           The default number of times to retry failed requests.
740           This will be used as the default num_retries value when get() and
741           put() are called.  Default 0.
742         """
743         self.lock = threading.Lock()
744         if proxy is None:
745             if config.get('ARVADOS_KEEP_SERVICES'):
746                 proxy = config.get('ARVADOS_KEEP_SERVICES')
747             else:
748                 proxy = config.get('ARVADOS_KEEP_PROXY')
749         if api_token is None:
750             if api_client is None:
751                 api_token = config.get('ARVADOS_API_TOKEN')
752             else:
753                 api_token = api_client.api_token
754         elif api_client is not None:
755             raise ValueError(
756                 "can't build KeepClient with both API client and token")
757         if local_store is None:
758             local_store = os.environ.get('KEEP_LOCAL_STORE')
759
760         self.block_cache = block_cache if block_cache else KeepBlockCache()
761         self.timeout = timeout
762         self.proxy_timeout = proxy_timeout
763         self._user_agent_pool = queue.LifoQueue()
764         self.upload_counter = Counter()
765         self.download_counter = Counter()
766         self.put_counter = Counter()
767         self.get_counter = Counter()
768         self.hits_counter = Counter()
769         self.misses_counter = Counter()
770
771         if local_store:
772             self.local_store = local_store
773             self.get = self.local_store_get
774             self.put = self.local_store_put
775         else:
776             self.num_retries = num_retries
777             self.max_replicas_per_service = None
778             if proxy:
779                 proxy_uris = proxy.split()
780                 for i in range(len(proxy_uris)):
781                     if not proxy_uris[i].endswith('/'):
782                         proxy_uris[i] += '/'
783                     # URL validation
784                     url = urllib.parse.urlparse(proxy_uris[i])
785                     if not (url.scheme and url.netloc):
786                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
787                 self.api_token = api_token
788                 self._gateway_services = {}
789                 self._keep_services = [{
790                     'uuid': "00000-bi6l4-%015d" % idx,
791                     'service_type': 'proxy',
792                     '_service_root': uri,
793                     } for idx, uri in enumerate(proxy_uris)]
794                 self._writable_services = self._keep_services
795                 self.using_proxy = True
796                 self._static_services_list = True
797             else:
798                 # It's important to avoid instantiating an API client
799                 # unless we actually need one, for testing's sake.
800                 if api_client is None:
801                     api_client = arvados.api('v1')
802                 self.api_client = api_client
803                 self.api_token = api_client.api_token
804                 self._gateway_services = {}
805                 self._keep_services = None
806                 self._writable_services = None
807                 self.using_proxy = None
808                 self._static_services_list = False
809
810     def current_timeout(self, attempt_number):
811         """Return the appropriate timeout to use for this client.
812
813         The proxy timeout setting if the backend service is currently a proxy,
814         the regular timeout setting otherwise.  The `attempt_number` indicates
815         how many times the operation has been tried already (starting from 0
816         for the first try), and scales the connection timeout portion of the
817         return value accordingly.
818
819         """
820         # TODO(twp): the timeout should be a property of a
821         # KeepService, not a KeepClient. See #4488.
822         t = self.proxy_timeout if self.using_proxy else self.timeout
823         if len(t) == 2:
824             return (t[0] * (1 << attempt_number), t[1])
825         else:
826             return (t[0] * (1 << attempt_number), t[1], t[2])
827     def _any_nondisk_services(self, service_list):
828         return any(ks.get('service_type', 'disk') != 'disk'
829                    for ks in service_list)
830
831     def build_services_list(self, force_rebuild=False):
832         if (self._static_services_list or
833               (self._keep_services and not force_rebuild)):
834             return
835         with self.lock:
836             try:
837                 keep_services = self.api_client.keep_services().accessible()
838             except Exception:  # API server predates Keep services.
839                 keep_services = self.api_client.keep_disks().list()
840
841             # Gateway services are only used when specified by UUID,
842             # so there's nothing to gain by filtering them by
843             # service_type.
844             self._gateway_services = {ks['uuid']: ks for ks in
845                                       keep_services.execute()['items']}
846             if not self._gateway_services:
847                 raise arvados.errors.NoKeepServersError()
848
849             # Precompute the base URI for each service.
850             for r in self._gateway_services.values():
851                 host = r['service_host']
852                 if not host.startswith('[') and host.find(':') >= 0:
853                     # IPv6 URIs must be formatted like http://[::1]:80/...
854                     host = '[' + host + ']'
855                 r['_service_root'] = "{}://{}:{:d}/".format(
856                     'https' if r['service_ssl_flag'] else 'http',
857                     host,
858                     r['service_port'])
859
860             _logger.debug(str(self._gateway_services))
861             self._keep_services = [
862                 ks for ks in self._gateway_services.values()
863                 if not ks.get('service_type', '').startswith('gateway:')]
864             self._writable_services = [ks for ks in self._keep_services
865                                        if not ks.get('read_only')]
866
867             # For disk type services, max_replicas_per_service is 1
868             # It is unknown (unlimited) for other service types.
869             if self._any_nondisk_services(self._writable_services):
870                 self.max_replicas_per_service = None
871             else:
872                 self.max_replicas_per_service = 1
873
874     def _service_weight(self, data_hash, service_uuid):
875         """Compute the weight of a Keep service endpoint for a data
876         block with a known hash.
877
878         The weight is md5(h + u) where u is the last 15 characters of
879         the service endpoint's UUID.
880         """
881         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
882
883     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
884         """Return an array of Keep service endpoints, in the order in
885         which they should be probed when reading or writing data with
886         the given hash+hints.
887         """
888         self.build_services_list(force_rebuild)
889
890         sorted_roots = []
891         # Use the services indicated by the given +K@... remote
892         # service hints, if any are present and can be resolved to a
893         # URI.
894         for hint in locator.hints:
895             if hint.startswith('K@'):
896                 if len(hint) == 7:
897                     sorted_roots.append(
898                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
899                 elif len(hint) == 29:
900                     svc = self._gateway_services.get(hint[2:])
901                     if svc:
902                         sorted_roots.append(svc['_service_root'])
903
904         # Sort the available local services by weight (heaviest first)
905         # for this locator, and return their service_roots (base URIs)
906         # in that order.
907         use_services = self._keep_services
908         if need_writable:
909             use_services = self._writable_services
910         self.using_proxy = self._any_nondisk_services(use_services)
911         sorted_roots.extend([
912             svc['_service_root'] for svc in sorted(
913                 use_services,
914                 reverse=True,
915                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
916         _logger.debug("{}: {}".format(locator, sorted_roots))
917         return sorted_roots
918
919     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
920         # roots_map is a dictionary, mapping Keep service root strings
921         # to KeepService objects.  Poll for Keep services, and add any
922         # new ones to roots_map.  Return the current list of local
923         # root strings.
924         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
925         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
926         for root in local_roots:
927             if root not in roots_map:
928                 roots_map[root] = self.KeepService(
929                     root, self._user_agent_pool,
930                     upload_counter=self.upload_counter,
931                     download_counter=self.download_counter,
932                     **headers)
933         return local_roots
934
935     @staticmethod
936     def _check_loop_result(result):
937         # KeepClient RetryLoops should save results as a 2-tuple: the
938         # actual result of the request, and the number of servers available
939         # to receive the request this round.
940         # This method returns True if there's a real result, False if
941         # there are no more servers available, otherwise None.
942         if isinstance(result, Exception):
943             return None
944         result, tried_server_count = result
945         if (result is not None) and (result is not False):
946             return True
947         elif tried_server_count < 1:
948             _logger.info("No more Keep services to try; giving up")
949             return False
950         else:
951             return None
952
953     def get_from_cache(self, loc):
954         """Fetch a block only if is in the cache, otherwise return None."""
955         slot = self.block_cache.get(loc)
956         if slot is not None and slot.ready.is_set():
957             return slot.get()
958         else:
959             return None
960
961     @retry.retry_method
962     def head(self, loc_s, num_retries=None):
963         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
964
965     @retry.retry_method
966     def get(self, loc_s, num_retries=None):
967         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
968
969     def _get_or_head(self, loc_s, method="GET", num_retries=None):
970         """Get data from Keep.
971
972         This method fetches one or more blocks of data from Keep.  It
973         sends a request each Keep service registered with the API
974         server (or the proxy provided when this client was
975         instantiated), then each service named in location hints, in
976         sequence.  As soon as one service provides the data, it's
977         returned.
978
979         Arguments:
980         * loc_s: A string of one or more comma-separated locators to fetch.
981           This method returns the concatenation of these blocks.
982         * num_retries: The number of times to retry GET requests to
983           *each* Keep server if it returns temporary failures, with
984           exponential backoff.  Note that, in each loop, the method may try
985           to fetch data from every available Keep service, along with any
986           that are named in location hints in the locator.  The default value
987           is set when the KeepClient is initialized.
988         """
989         if ',' in loc_s:
990             return ''.join(self.get(x) for x in loc_s.split(','))
991
992         self.get_counter.add(1)
993
994         locator = KeepLocator(loc_s)
995         if method == "GET":
996             slot, first = self.block_cache.reserve_cache(locator.md5sum)
997             if not first:
998                 self.hits_counter.add(1)
999                 v = slot.get()
1000                 return v
1001
1002         self.misses_counter.add(1)
1003
1004         # If the locator has hints specifying a prefix (indicating a
1005         # remote keepproxy) or the UUID of a local gateway service,
1006         # read data from the indicated service(s) instead of the usual
1007         # list of local disk services.
1008         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1009                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1010         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1011                            for hint in locator.hints if (
1012                                    hint.startswith('K@') and
1013                                    len(hint) == 29 and
1014                                    self._gateway_services.get(hint[2:])
1015                                    )])
1016         # Map root URLs to their KeepService objects.
1017         roots_map = {
1018             root: self.KeepService(root, self._user_agent_pool,
1019                                    upload_counter=self.upload_counter,
1020                                    download_counter=self.download_counter)
1021             for root in hint_roots
1022         }
1023
1024         # See #3147 for a discussion of the loop implementation.  Highlights:
1025         # * Refresh the list of Keep services after each failure, in case
1026         #   it's being updated.
1027         # * Retry until we succeed, we're out of retries, or every available
1028         #   service has returned permanent failure.
1029         sorted_roots = []
1030         roots_map = {}
1031         blob = None
1032         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1033                                backoff_start=2)
1034         for tries_left in loop:
1035             try:
1036                 sorted_roots = self.map_new_services(
1037                     roots_map, locator,
1038                     force_rebuild=(tries_left < num_retries),
1039                     need_writable=False)
1040             except Exception as error:
1041                 loop.save_result(error)
1042                 continue
1043
1044             # Query KeepService objects that haven't returned
1045             # permanent failure, in our specified shuffle order.
1046             services_to_try = [roots_map[root]
1047                                for root in sorted_roots
1048                                if roots_map[root].usable()]
1049             for keep_service in services_to_try:
1050                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1051                 if blob is not None:
1052                     break
1053             loop.save_result((blob, len(services_to_try)))
1054
1055         # Always cache the result, then return it if we succeeded.
1056         if method == "GET":
1057             slot.set(blob)
1058             self.block_cache.cap_cache()
1059         if loop.success():
1060             if method == "HEAD":
1061                 return True
1062             else:
1063                 return blob
1064
1065         # Q: Including 403 is necessary for the Keep tests to continue
1066         # passing, but maybe they should expect KeepReadError instead?
1067         not_founds = sum(1 for key in sorted_roots
1068                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1069         service_errors = ((key, roots_map[key].last_result()['error'])
1070                           for key in sorted_roots)
1071         if not roots_map:
1072             raise arvados.errors.KeepReadError(
1073                 "failed to read {}: no Keep services available ({})".format(
1074                     loc_s, loop.last_result()))
1075         elif not_founds == len(sorted_roots):
1076             raise arvados.errors.NotFoundError(
1077                 "{} not found".format(loc_s), service_errors)
1078         else:
1079             raise arvados.errors.KeepReadError(
1080                 "failed to read {}".format(loc_s), service_errors, label="service")
1081
1082     @retry.retry_method
1083     def put(self, data, copies=2, num_retries=None):
1084         """Save data in Keep.
1085
1086         This method will get a list of Keep services from the API server, and
1087         send the data to each one simultaneously in a new thread.  Once the
1088         uploads are finished, if enough copies are saved, this method returns
1089         the most recent HTTP response body.  If requests fail to upload
1090         enough copies, this method raises KeepWriteError.
1091
1092         Arguments:
1093         * data: The string of data to upload.
1094         * copies: The number of copies that the user requires be saved.
1095           Default 2.
1096         * num_retries: The number of times to retry PUT requests to
1097           *each* Keep server if it returns temporary failures, with
1098           exponential backoff.  The default value is set when the
1099           KeepClient is initialized.
1100         """
1101
1102         if not isinstance(data, bytes):
1103             data = data.encode()
1104
1105         self.put_counter.add(1)
1106
1107         data_hash = hashlib.md5(data).hexdigest()
1108         loc_s = data_hash + '+' + str(len(data))
1109         if copies < 1:
1110             return loc_s
1111         locator = KeepLocator(loc_s)
1112
1113         headers = {}
1114         # Tell the proxy how many copies we want it to store
1115         headers['X-Keep-Desired-Replicas'] = str(copies)
1116         roots_map = {}
1117         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1118                                backoff_start=2)
1119         done = 0
1120         for tries_left in loop:
1121             try:
1122                 sorted_roots = self.map_new_services(
1123                     roots_map, locator,
1124                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1125             except Exception as error:
1126                 loop.save_result(error)
1127                 continue
1128
1129             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1130                                                         data_hash=data_hash,
1131                                                         copies=copies - done,
1132                                                         max_service_replicas=self.max_replicas_per_service,
1133                                                         timeout=self.current_timeout(num_retries - tries_left))
1134             for service_root, ks in [(root, roots_map[root])
1135                                      for root in sorted_roots]:
1136                 if ks.finished():
1137                     continue
1138                 writer_pool.add_task(ks, service_root)
1139             writer_pool.join()
1140             done += writer_pool.done()
1141             loop.save_result((done >= copies, writer_pool.total_task_nr))
1142
1143         if loop.success():
1144             return writer_pool.response()
1145         if not roots_map:
1146             raise arvados.errors.KeepWriteError(
1147                 "failed to write {}: no Keep services available ({})".format(
1148                     data_hash, loop.last_result()))
1149         else:
1150             service_errors = ((key, roots_map[key].last_result()['error'])
1151                               for key in sorted_roots
1152                               if roots_map[key].last_result()['error'])
1153             raise arvados.errors.KeepWriteError(
1154                 "failed to write {} (wanted {} copies but wrote {})".format(
1155                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1156
1157     def local_store_put(self, data, copies=1, num_retries=None):
1158         """A stub for put().
1159
1160         This method is used in place of the real put() method when
1161         using local storage (see constructor's local_store argument).
1162
1163         copies and num_retries arguments are ignored: they are here
1164         only for the sake of offering the same call signature as
1165         put().
1166
1167         Data stored this way can be retrieved via local_store_get().
1168         """
1169         md5 = hashlib.md5(data).hexdigest()
1170         locator = '%s+%d' % (md5, len(data))
1171         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1172             f.write(data)
1173         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1174                   os.path.join(self.local_store, md5))
1175         return locator
1176
1177     def local_store_get(self, loc_s, num_retries=None):
1178         """Companion to local_store_put()."""
1179         try:
1180             locator = KeepLocator(loc_s)
1181         except ValueError:
1182             raise arvados.errors.NotFoundError(
1183                 "Invalid data locator: '%s'" % loc_s)
1184         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1185             return b''
1186         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1187             return f.read()
1188
1189     def is_cached(self, locator):
1190         return self.block_cache.reserve_cache(expect_hash)