11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / arvados / keep.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import next
6 from builtins import str
7 from builtins import range
8 from builtins import object
9 import collections
10 import datetime
11 import hashlib
12 import io
13 import logging
14 import math
15 import os
16 import pycurl
17 import queue
18 import re
19 import socket
20 import ssl
21 import sys
22 import threading
23 from . import timer
24 import urllib.parse
25
26 if sys.version_info >= (3, 0):
27     from io import BytesIO
28 else:
29     from cStringIO import StringIO as BytesIO
30
31 import arvados
32 import arvados.config as config
33 import arvados.errors
34 import arvados.retry as retry
35 import arvados.util
36
37 _logger = logging.getLogger('arvados.keep')
38 global_client_object = None
39
40
41 # Monkey patch TCP constants when not available (apple). Values sourced from:
42 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
43 if sys.platform == 'darwin':
44     if not hasattr(socket, 'TCP_KEEPALIVE'):
45         socket.TCP_KEEPALIVE = 0x010
46     if not hasattr(socket, 'TCP_KEEPINTVL'):
47         socket.TCP_KEEPINTVL = 0x101
48     if not hasattr(socket, 'TCP_KEEPCNT'):
49         socket.TCP_KEEPCNT = 0x102
50
51
52 class KeepLocator(object):
53     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
54     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
55
56     def __init__(self, locator_str):
57         self.hints = []
58         self._perm_sig = None
59         self._perm_expiry = None
60         pieces = iter(locator_str.split('+'))
61         self.md5sum = next(pieces)
62         try:
63             self.size = int(next(pieces))
64         except StopIteration:
65             self.size = None
66         for hint in pieces:
67             if self.HINT_RE.match(hint) is None:
68                 raise ValueError("invalid hint format: {}".format(hint))
69             elif hint.startswith('A'):
70                 self.parse_permission_hint(hint)
71             else:
72                 self.hints.append(hint)
73
74     def __str__(self):
75         return '+'.join(
76             str(s) for s in [self.md5sum, self.size,
77                              self.permission_hint()] + self.hints
78             if s is not None)
79
80     def stripped(self):
81         if self.size is not None:
82             return "%s+%i" % (self.md5sum, self.size)
83         else:
84             return self.md5sum
85
86     def _make_hex_prop(name, length):
87         # Build and return a new property with the given name that
88         # must be a hex string of the given length.
89         data_name = '_{}'.format(name)
90         def getter(self):
91             return getattr(self, data_name)
92         def setter(self, hex_str):
93             if not arvados.util.is_hex(hex_str, length):
94                 raise ValueError("{} is not a {}-digit hex string: {!r}".
95                                  format(name, length, hex_str))
96             setattr(self, data_name, hex_str)
97         return property(getter, setter)
98
99     md5sum = _make_hex_prop('md5sum', 32)
100     perm_sig = _make_hex_prop('perm_sig', 40)
101
102     @property
103     def perm_expiry(self):
104         return self._perm_expiry
105
106     @perm_expiry.setter
107     def perm_expiry(self, value):
108         if not arvados.util.is_hex(value, 1, 8):
109             raise ValueError(
110                 "permission timestamp must be a hex Unix timestamp: {}".
111                 format(value))
112         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
113
114     def permission_hint(self):
115         data = [self.perm_sig, self.perm_expiry]
116         if None in data:
117             return None
118         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
119         return "A{}@{:08x}".format(*data)
120
121     def parse_permission_hint(self, s):
122         try:
123             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
124         except IndexError:
125             raise ValueError("bad permission hint {}".format(s))
126
127     def permission_expired(self, as_of_dt=None):
128         if self.perm_expiry is None:
129             return False
130         elif as_of_dt is None:
131             as_of_dt = datetime.datetime.now()
132         return self.perm_expiry <= as_of_dt
133
134
135 class Keep(object):
136     """Simple interface to a global KeepClient object.
137
138     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
139     own API client.  The global KeepClient will build an API client from the
140     current Arvados configuration, which may not match the one you built.
141     """
142     _last_key = None
143
144     @classmethod
145     def global_client_object(cls):
146         global global_client_object
147         # Previously, KeepClient would change its behavior at runtime based
148         # on these configuration settings.  We simulate that behavior here
149         # by checking the values and returning a new KeepClient if any of
150         # them have changed.
151         key = (config.get('ARVADOS_API_HOST'),
152                config.get('ARVADOS_API_TOKEN'),
153                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
154                config.get('ARVADOS_KEEP_PROXY'),
155                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
156                os.environ.get('KEEP_LOCAL_STORE'))
157         if (global_client_object is None) or (cls._last_key != key):
158             global_client_object = KeepClient()
159             cls._last_key = key
160         return global_client_object
161
162     @staticmethod
163     def get(locator, **kwargs):
164         return Keep.global_client_object().get(locator, **kwargs)
165
166     @staticmethod
167     def put(data, **kwargs):
168         return Keep.global_client_object().put(data, **kwargs)
169
170 class KeepBlockCache(object):
171     # Default RAM cache is 256MiB
172     def __init__(self, cache_max=(256 * 1024 * 1024)):
173         self.cache_max = cache_max
174         self._cache = []
175         self._cache_lock = threading.Lock()
176
177     class CacheSlot(object):
178         __slots__ = ("locator", "ready", "content")
179
180         def __init__(self, locator):
181             self.locator = locator
182             self.ready = threading.Event()
183             self.content = None
184
185         def get(self):
186             self.ready.wait()
187             return self.content
188
189         def set(self, value):
190             self.content = value
191             self.ready.set()
192
193         def size(self):
194             if self.content is None:
195                 return 0
196             else:
197                 return len(self.content)
198
199     def cap_cache(self):
200         '''Cap the cache size to self.cache_max'''
201         with self._cache_lock:
202             # Select all slots except those where ready.is_set() and content is
203             # None (that means there was an error reading the block).
204             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
205             sm = sum([slot.size() for slot in self._cache])
206             while len(self._cache) > 0 and sm > self.cache_max:
207                 for i in range(len(self._cache)-1, -1, -1):
208                     if self._cache[i].ready.is_set():
209                         del self._cache[i]
210                         break
211                 sm = sum([slot.size() for slot in self._cache])
212
213     def _get(self, locator):
214         # Test if the locator is already in the cache
215         for i in range(0, len(self._cache)):
216             if self._cache[i].locator == locator:
217                 n = self._cache[i]
218                 if i != 0:
219                     # move it to the front
220                     del self._cache[i]
221                     self._cache.insert(0, n)
222                 return n
223         return None
224
225     def get(self, locator):
226         with self._cache_lock:
227             return self._get(locator)
228
229     def reserve_cache(self, locator):
230         '''Reserve a cache slot for the specified locator,
231         or return the existing slot.'''
232         with self._cache_lock:
233             n = self._get(locator)
234             if n:
235                 return n, False
236             else:
237                 # Add a new cache slot for the locator
238                 n = KeepBlockCache.CacheSlot(locator)
239                 self._cache.insert(0, n)
240                 return n, True
241
242 class Counter(object):
243     def __init__(self, v=0):
244         self._lk = threading.Lock()
245         self._val = v
246
247     def add(self, v):
248         with self._lk:
249             self._val += v
250
251     def get(self):
252         with self._lk:
253             return self._val
254
255
256 class KeepClient(object):
257
258     # Default Keep server connection timeout:  2 seconds
259     # Default Keep server read timeout:       256 seconds
260     # Default Keep server bandwidth minimum:  32768 bytes per second
261     # Default Keep proxy connection timeout:  20 seconds
262     # Default Keep proxy read timeout:        256 seconds
263     # Default Keep proxy bandwidth minimum:   32768 bytes per second
264     DEFAULT_TIMEOUT = (2, 256, 32768)
265     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
266
267
268     class KeepService(object):
269         """Make requests to a single Keep service, and track results.
270
271         A KeepService is intended to last long enough to perform one
272         transaction (GET or PUT) against one Keep service. This can
273         involve calling either get() or put() multiple times in order
274         to retry after transient failures. However, calling both get()
275         and put() on a single instance -- or using the same instance
276         to access two different Keep services -- will not produce
277         sensible behavior.
278         """
279
280         HTTP_ERRORS = (
281             socket.error,
282             ssl.SSLError,
283             arvados.errors.HttpError,
284         )
285
286         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
287                      upload_counter=None,
288                      download_counter=None, **headers):
289             self.root = root
290             self._user_agent_pool = user_agent_pool
291             self._result = {'error': None}
292             self._usable = True
293             self._session = None
294             self._socket = None
295             self.get_headers = {'Accept': 'application/octet-stream'}
296             self.get_headers.update(headers)
297             self.put_headers = headers
298             self.upload_counter = upload_counter
299             self.download_counter = download_counter
300
301         def usable(self):
302             """Is it worth attempting a request?"""
303             return self._usable
304
305         def finished(self):
306             """Did the request succeed or encounter permanent failure?"""
307             return self._result['error'] == False or not self._usable
308
309         def last_result(self):
310             return self._result
311
312         def _get_user_agent(self):
313             try:
314                 return self._user_agent_pool.get(block=False)
315             except queue.Empty:
316                 return pycurl.Curl()
317
318         def _put_user_agent(self, ua):
319             try:
320                 ua.reset()
321                 self._user_agent_pool.put(ua, block=False)
322             except:
323                 ua.close()
324
325         def _socket_open(self, *args, **kwargs):
326             if len(args) + len(kwargs) == 2:
327                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
328             else:
329                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
330
331         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
332             return self._socket_open_pycurl_7_21_5(
333                 purpose=None,
334                 address=collections.namedtuple(
335                     'Address', ['family', 'socktype', 'protocol', 'addr'],
336                 )(family, socktype, protocol, address))
337
338         def _socket_open_pycurl_7_21_5(self, purpose, address):
339             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
340             s = socket.socket(address.family, address.socktype, address.protocol)
341             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
342             # Will throw invalid protocol error on mac. This test prevents that.
343             if hasattr(socket, 'TCP_KEEPIDLE'):
344                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
345             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
346             self._socket = s
347             return s
348
349         def get(self, locator, method="GET", timeout=None):
350             # locator is a KeepLocator object.
351             url = self.root + str(locator)
352             _logger.debug("Request: %s %s", method, url)
353             curl = self._get_user_agent()
354             ok = None
355             try:
356                 with timer.Timer() as t:
357                     self._headers = {}
358                     response_body = BytesIO()
359                     curl.setopt(pycurl.NOSIGNAL, 1)
360                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
361                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
362                     curl.setopt(pycurl.URL, url.encode('utf-8'))
363                     curl.setopt(pycurl.HTTPHEADER, [
364                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
365                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
366                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
367                     if method == "HEAD":
368                         curl.setopt(pycurl.NOBODY, True)
369                     self._setcurltimeouts(curl, timeout)
370
371                     try:
372                         curl.perform()
373                     except Exception as e:
374                         raise arvados.errors.HttpError(0, str(e))
375                     finally:
376                         if self._socket:
377                             self._socket.close()
378                             self._socket = None
379                     self._result = {
380                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
381                         'body': response_body.getvalue(),
382                         'headers': self._headers,
383                         'error': False,
384                     }
385
386                 ok = retry.check_http_response_success(self._result['status_code'])
387                 if not ok:
388                     self._result['error'] = arvados.errors.HttpError(
389                         self._result['status_code'],
390                         self._headers.get('x-status-line', 'Error'))
391             except self.HTTP_ERRORS as e:
392                 self._result = {
393                     'error': e,
394                 }
395             self._usable = ok != False
396             if self._result.get('status_code', None):
397                 # The client worked well enough to get an HTTP status
398                 # code, so presumably any problems are just on the
399                 # server side and it's OK to reuse the client.
400                 self._put_user_agent(curl)
401             else:
402                 # Don't return this client to the pool, in case it's
403                 # broken.
404                 curl.close()
405             if not ok:
406                 _logger.debug("Request fail: GET %s => %s: %s",
407                               url, type(self._result['error']), str(self._result['error']))
408                 return None
409             if method == "HEAD":
410                 _logger.info("HEAD %s: %s bytes",
411                          self._result['status_code'],
412                          self._result.get('content-length'))
413                 return True
414
415             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
416                          self._result['status_code'],
417                          len(self._result['body']),
418                          t.msecs,
419                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
420
421             if self.download_counter:
422                 self.download_counter.add(len(self._result['body']))
423             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
424             if resp_md5 != locator.md5sum:
425                 _logger.warning("Checksum fail: md5(%s) = %s",
426                                 url, resp_md5)
427                 self._result['error'] = arvados.errors.HttpError(
428                     0, 'Checksum fail')
429                 return None
430             return self._result['body']
431
432         def put(self, hash_s, body, timeout=None):
433             url = self.root + hash_s
434             _logger.debug("Request: PUT %s", url)
435             curl = self._get_user_agent()
436             ok = None
437             try:
438                 with timer.Timer() as t:
439                     self._headers = {}
440                     body_reader = BytesIO(body)
441                     response_body = BytesIO()
442                     curl.setopt(pycurl.NOSIGNAL, 1)
443                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
444                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
445                     curl.setopt(pycurl.URL, url.encode('utf-8'))
446                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
447                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
448                     # response) instead of sending the request body immediately.
449                     # This allows the server to reject the request if the request
450                     # is invalid or the server is read-only, without waiting for
451                     # the client to send the entire block.
452                     curl.setopt(pycurl.UPLOAD, True)
453                     curl.setopt(pycurl.INFILESIZE, len(body))
454                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
455                     curl.setopt(pycurl.HTTPHEADER, [
456                         '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
457                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
458                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
459                     self._setcurltimeouts(curl, timeout)
460                     try:
461                         curl.perform()
462                     except Exception as e:
463                         raise arvados.errors.HttpError(0, str(e))
464                     finally:
465                         if self._socket:
466                             self._socket.close()
467                             self._socket = None
468                     self._result = {
469                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
470                         'body': response_body.getvalue().decode('utf-8'),
471                         'headers': self._headers,
472                         'error': False,
473                     }
474                 ok = retry.check_http_response_success(self._result['status_code'])
475                 if not ok:
476                     self._result['error'] = arvados.errors.HttpError(
477                         self._result['status_code'],
478                         self._headers.get('x-status-line', 'Error'))
479             except self.HTTP_ERRORS as e:
480                 self._result = {
481                     'error': e,
482                 }
483             self._usable = ok != False # still usable if ok is True or None
484             if self._result.get('status_code', None):
485                 # Client is functional. See comment in get().
486                 self._put_user_agent(curl)
487             else:
488                 curl.close()
489             if not ok:
490                 _logger.debug("Request fail: PUT %s => %s: %s",
491                               url, type(self._result['error']), str(self._result['error']))
492                 return False
493             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
494                          self._result['status_code'],
495                          len(body),
496                          t.msecs,
497                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
498             if self.upload_counter:
499                 self.upload_counter.add(len(body))
500             return True
501
502         def _setcurltimeouts(self, curl, timeouts):
503             if not timeouts:
504                 return
505             elif isinstance(timeouts, tuple):
506                 if len(timeouts) == 2:
507                     conn_t, xfer_t = timeouts
508                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
509                 else:
510                     conn_t, xfer_t, bandwidth_bps = timeouts
511             else:
512                 conn_t, xfer_t = (timeouts, timeouts)
513                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
514             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
515             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
516             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
517
518         def _headerfunction(self, header_line):
519             if isinstance(header_line, bytes):
520                 header_line = header_line.decode('iso-8859-1')
521             if ':' in header_line:
522                 name, value = header_line.split(':', 1)
523                 name = name.strip().lower()
524                 value = value.strip()
525             elif self._headers:
526                 name = self._lastheadername
527                 value = self._headers[name] + ' ' + header_line.strip()
528             elif header_line.startswith('HTTP/'):
529                 name = 'x-status-line'
530                 value = header_line
531             else:
532                 _logger.error("Unexpected header line: %s", header_line)
533                 return
534             self._lastheadername = name
535             self._headers[name] = value
536             # Returning None implies all bytes were written
537     
538
539     class KeepWriterQueue(queue.Queue):
540         def __init__(self, copies):
541             queue.Queue.__init__(self) # Old-style superclass
542             self.wanted_copies = copies
543             self.successful_copies = 0
544             self.response = None
545             self.successful_copies_lock = threading.Lock()
546             self.pending_tries = copies
547             self.pending_tries_notification = threading.Condition()
548         
549         def write_success(self, response, replicas_nr):
550             with self.successful_copies_lock:
551                 self.successful_copies += replicas_nr
552                 self.response = response
553             with self.pending_tries_notification:
554                 self.pending_tries_notification.notify_all()
555         
556         def write_fail(self, ks):
557             with self.pending_tries_notification:
558                 self.pending_tries += 1
559                 self.pending_tries_notification.notify()
560         
561         def pending_copies(self):
562             with self.successful_copies_lock:
563                 return self.wanted_copies - self.successful_copies
564
565         def get_next_task(self):
566             with self.pending_tries_notification:
567                 while True:
568                     if self.pending_copies() < 1:
569                         # This notify_all() is unnecessary --
570                         # write_success() already called notify_all()
571                         # when pending<1 became true, so it's not
572                         # possible for any other thread to be in
573                         # wait() now -- but it's cheap insurance
574                         # against deadlock so we do it anyway:
575                         self.pending_tries_notification.notify_all()
576                         # Drain the queue and then raise Queue.Empty
577                         while True:
578                             self.get_nowait()
579                             self.task_done()
580                     elif self.pending_tries > 0:
581                         service, service_root = self.get_nowait()
582                         if service.finished():
583                             self.task_done()
584                             continue
585                         self.pending_tries -= 1
586                         return service, service_root
587                     elif self.empty():
588                         self.pending_tries_notification.notify_all()
589                         raise queue.Empty
590                     else:
591                         self.pending_tries_notification.wait()
592
593
594     class KeepWriterThreadPool(object):
595         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
596             self.total_task_nr = 0
597             self.wanted_copies = copies
598             if (not max_service_replicas) or (max_service_replicas >= copies):
599                 num_threads = 1
600             else:
601                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
602             _logger.debug("Pool max threads is %d", num_threads)
603             self.workers = []
604             self.queue = KeepClient.KeepWriterQueue(copies)
605             # Create workers
606             for _ in range(num_threads):
607                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
608                 self.workers.append(w)
609         
610         def add_task(self, ks, service_root):
611             self.queue.put((ks, service_root))
612             self.total_task_nr += 1
613         
614         def done(self):
615             return self.queue.successful_copies
616         
617         def join(self):
618             # Start workers
619             for worker in self.workers:
620                 worker.start()
621             # Wait for finished work
622             self.queue.join()
623         
624         def response(self):
625             return self.queue.response
626     
627     
628     class KeepWriterThread(threading.Thread):
629         TaskFailed = RuntimeError()
630
631         def __init__(self, queue, data, data_hash, timeout=None):
632             super(KeepClient.KeepWriterThread, self).__init__()
633             self.timeout = timeout
634             self.queue = queue
635             self.data = data
636             self.data_hash = data_hash
637             self.daemon = True
638
639         def run(self):
640             while True:
641                 try:
642                     service, service_root = self.queue.get_next_task()
643                 except queue.Empty:
644                     return
645                 try:
646                     locator, copies = self.do_task(service, service_root)
647                 except Exception as e:
648                     if e is not self.TaskFailed:
649                         _logger.exception("Exception in KeepWriterThread")
650                     self.queue.write_fail(service)
651                 else:
652                     self.queue.write_success(locator, copies)
653                 finally:
654                     self.queue.task_done()
655
656         def do_task(self, service, service_root):
657             success = bool(service.put(self.data_hash,
658                                         self.data,
659                                         timeout=self.timeout))
660             result = service.last_result()
661
662             if not success:
663                 if result.get('status_code', None):
664                     _logger.debug("Request fail: PUT %s => %s %s",
665                                   self.data_hash,
666                                   result['status_code'],
667                                   result['body'])
668                 raise self.TaskFailed
669
670             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
671                           str(threading.current_thread()),
672                           self.data_hash,
673                           len(self.data),
674                           service_root)
675             try:
676                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
677             except (KeyError, ValueError):
678                 replicas_stored = 1
679
680             return result['body'].strip(), replicas_stored
681
682
683     def __init__(self, api_client=None, proxy=None,
684                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
685                  api_token=None, local_store=None, block_cache=None,
686                  num_retries=0, session=None):
687         """Initialize a new KeepClient.
688
689         Arguments:
690         :api_client:
691           The API client to use to find Keep services.  If not
692           provided, KeepClient will build one from available Arvados
693           configuration.
694
695         :proxy:
696           If specified, this KeepClient will send requests to this Keep
697           proxy.  Otherwise, KeepClient will fall back to the setting of the
698           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
699           If you want to KeepClient does not use a proxy, pass in an empty
700           string.
701
702         :timeout:
703           The initial timeout (in seconds) for HTTP requests to Keep
704           non-proxy servers.  A tuple of three floats is interpreted as
705           (connection_timeout, read_timeout, minimum_bandwidth). A connection
706           will be aborted if the average traffic rate falls below
707           minimum_bandwidth bytes per second over an interval of read_timeout
708           seconds. Because timeouts are often a result of transient server
709           load, the actual connection timeout will be increased by a factor
710           of two on each retry.
711           Default: (2, 256, 32768).
712
713         :proxy_timeout:
714           The initial timeout (in seconds) for HTTP requests to
715           Keep proxies. A tuple of three floats is interpreted as
716           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
717           described above for adjusting connection timeouts on retry also
718           applies.
719           Default: (20, 256, 32768).
720
721         :api_token:
722           If you're not using an API client, but only talking
723           directly to a Keep proxy, this parameter specifies an API token
724           to authenticate Keep requests.  It is an error to specify both
725           api_client and api_token.  If you specify neither, KeepClient
726           will use one available from the Arvados configuration.
727
728         :local_store:
729           If specified, this KeepClient will bypass Keep
730           services, and save data to the named directory.  If unspecified,
731           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
732           environment variable.  If you want to ensure KeepClient does not
733           use local storage, pass in an empty string.  This is primarily
734           intended to mock a server for testing.
735
736         :num_retries:
737           The default number of times to retry failed requests.
738           This will be used as the default num_retries value when get() and
739           put() are called.  Default 0.
740         """
741         self.lock = threading.Lock()
742         if proxy is None:
743             if config.get('ARVADOS_KEEP_SERVICES'):
744                 proxy = config.get('ARVADOS_KEEP_SERVICES')
745             else:
746                 proxy = config.get('ARVADOS_KEEP_PROXY')
747         if api_token is None:
748             if api_client is None:
749                 api_token = config.get('ARVADOS_API_TOKEN')
750             else:
751                 api_token = api_client.api_token
752         elif api_client is not None:
753             raise ValueError(
754                 "can't build KeepClient with both API client and token")
755         if local_store is None:
756             local_store = os.environ.get('KEEP_LOCAL_STORE')
757
758         self.block_cache = block_cache if block_cache else KeepBlockCache()
759         self.timeout = timeout
760         self.proxy_timeout = proxy_timeout
761         self._user_agent_pool = queue.LifoQueue()
762         self.upload_counter = Counter()
763         self.download_counter = Counter()
764         self.put_counter = Counter()
765         self.get_counter = Counter()
766         self.hits_counter = Counter()
767         self.misses_counter = Counter()
768
769         if local_store:
770             self.local_store = local_store
771             self.get = self.local_store_get
772             self.put = self.local_store_put
773         else:
774             self.num_retries = num_retries
775             self.max_replicas_per_service = None
776             if proxy:
777                 proxy_uris = proxy.split()
778                 for i in range(len(proxy_uris)):
779                     if not proxy_uris[i].endswith('/'):
780                         proxy_uris[i] += '/'
781                     # URL validation
782                     url = urllib.parse.urlparse(proxy_uris[i])
783                     if not (url.scheme and url.netloc):
784                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
785                 self.api_token = api_token
786                 self._gateway_services = {}
787                 self._keep_services = [{
788                     'uuid': "00000-bi6l4-%015d" % idx,
789                     'service_type': 'proxy',
790                     '_service_root': uri,
791                     } for idx, uri in enumerate(proxy_uris)]
792                 self._writable_services = self._keep_services
793                 self.using_proxy = True
794                 self._static_services_list = True
795             else:
796                 # It's important to avoid instantiating an API client
797                 # unless we actually need one, for testing's sake.
798                 if api_client is None:
799                     api_client = arvados.api('v1')
800                 self.api_client = api_client
801                 self.api_token = api_client.api_token
802                 self._gateway_services = {}
803                 self._keep_services = None
804                 self._writable_services = None
805                 self.using_proxy = None
806                 self._static_services_list = False
807
808     def current_timeout(self, attempt_number):
809         """Return the appropriate timeout to use for this client.
810
811         The proxy timeout setting if the backend service is currently a proxy,
812         the regular timeout setting otherwise.  The `attempt_number` indicates
813         how many times the operation has been tried already (starting from 0
814         for the first try), and scales the connection timeout portion of the
815         return value accordingly.
816
817         """
818         # TODO(twp): the timeout should be a property of a
819         # KeepService, not a KeepClient. See #4488.
820         t = self.proxy_timeout if self.using_proxy else self.timeout
821         if len(t) == 2:
822             return (t[0] * (1 << attempt_number), t[1])
823         else:
824             return (t[0] * (1 << attempt_number), t[1], t[2])
825     def _any_nondisk_services(self, service_list):
826         return any(ks.get('service_type', 'disk') != 'disk'
827                    for ks in service_list)
828
829     def build_services_list(self, force_rebuild=False):
830         if (self._static_services_list or
831               (self._keep_services and not force_rebuild)):
832             return
833         with self.lock:
834             try:
835                 keep_services = self.api_client.keep_services().accessible()
836             except Exception:  # API server predates Keep services.
837                 keep_services = self.api_client.keep_disks().list()
838
839             # Gateway services are only used when specified by UUID,
840             # so there's nothing to gain by filtering them by
841             # service_type.
842             self._gateway_services = {ks['uuid']: ks for ks in
843                                       keep_services.execute()['items']}
844             if not self._gateway_services:
845                 raise arvados.errors.NoKeepServersError()
846
847             # Precompute the base URI for each service.
848             for r in self._gateway_services.values():
849                 host = r['service_host']
850                 if not host.startswith('[') and host.find(':') >= 0:
851                     # IPv6 URIs must be formatted like http://[::1]:80/...
852                     host = '[' + host + ']'
853                 r['_service_root'] = "{}://{}:{:d}/".format(
854                     'https' if r['service_ssl_flag'] else 'http',
855                     host,
856                     r['service_port'])
857
858             _logger.debug(str(self._gateway_services))
859             self._keep_services = [
860                 ks for ks in self._gateway_services.values()
861                 if not ks.get('service_type', '').startswith('gateway:')]
862             self._writable_services = [ks for ks in self._keep_services
863                                        if not ks.get('read_only')]
864
865             # For disk type services, max_replicas_per_service is 1
866             # It is unknown (unlimited) for other service types.
867             if self._any_nondisk_services(self._writable_services):
868                 self.max_replicas_per_service = None
869             else:
870                 self.max_replicas_per_service = 1
871
872     def _service_weight(self, data_hash, service_uuid):
873         """Compute the weight of a Keep service endpoint for a data
874         block with a known hash.
875
876         The weight is md5(h + u) where u is the last 15 characters of
877         the service endpoint's UUID.
878         """
879         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
880
881     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
882         """Return an array of Keep service endpoints, in the order in
883         which they should be probed when reading or writing data with
884         the given hash+hints.
885         """
886         self.build_services_list(force_rebuild)
887
888         sorted_roots = []
889         # Use the services indicated by the given +K@... remote
890         # service hints, if any are present and can be resolved to a
891         # URI.
892         for hint in locator.hints:
893             if hint.startswith('K@'):
894                 if len(hint) == 7:
895                     sorted_roots.append(
896                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
897                 elif len(hint) == 29:
898                     svc = self._gateway_services.get(hint[2:])
899                     if svc:
900                         sorted_roots.append(svc['_service_root'])
901
902         # Sort the available local services by weight (heaviest first)
903         # for this locator, and return their service_roots (base URIs)
904         # in that order.
905         use_services = self._keep_services
906         if need_writable:
907             use_services = self._writable_services
908         self.using_proxy = self._any_nondisk_services(use_services)
909         sorted_roots.extend([
910             svc['_service_root'] for svc in sorted(
911                 use_services,
912                 reverse=True,
913                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
914         _logger.debug("{}: {}".format(locator, sorted_roots))
915         return sorted_roots
916
917     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
918         # roots_map is a dictionary, mapping Keep service root strings
919         # to KeepService objects.  Poll for Keep services, and add any
920         # new ones to roots_map.  Return the current list of local
921         # root strings.
922         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
923         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
924         for root in local_roots:
925             if root not in roots_map:
926                 roots_map[root] = self.KeepService(
927                     root, self._user_agent_pool,
928                     upload_counter=self.upload_counter,
929                     download_counter=self.download_counter,
930                     **headers)
931         return local_roots
932
933     @staticmethod
934     def _check_loop_result(result):
935         # KeepClient RetryLoops should save results as a 2-tuple: the
936         # actual result of the request, and the number of servers available
937         # to receive the request this round.
938         # This method returns True if there's a real result, False if
939         # there are no more servers available, otherwise None.
940         if isinstance(result, Exception):
941             return None
942         result, tried_server_count = result
943         if (result is not None) and (result is not False):
944             return True
945         elif tried_server_count < 1:
946             _logger.info("No more Keep services to try; giving up")
947             return False
948         else:
949             return None
950
951     def get_from_cache(self, loc):
952         """Fetch a block only if is in the cache, otherwise return None."""
953         slot = self.block_cache.get(loc)
954         if slot is not None and slot.ready.is_set():
955             return slot.get()
956         else:
957             return None
958
959     @retry.retry_method
960     def head(self, loc_s, num_retries=None):
961         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
962
963     @retry.retry_method
964     def get(self, loc_s, num_retries=None):
965         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
966
967     def _get_or_head(self, loc_s, method="GET", num_retries=None):
968         """Get data from Keep.
969
970         This method fetches one or more blocks of data from Keep.  It
971         sends a request each Keep service registered with the API
972         server (or the proxy provided when this client was
973         instantiated), then each service named in location hints, in
974         sequence.  As soon as one service provides the data, it's
975         returned.
976
977         Arguments:
978         * loc_s: A string of one or more comma-separated locators to fetch.
979           This method returns the concatenation of these blocks.
980         * num_retries: The number of times to retry GET requests to
981           *each* Keep server if it returns temporary failures, with
982           exponential backoff.  Note that, in each loop, the method may try
983           to fetch data from every available Keep service, along with any
984           that are named in location hints in the locator.  The default value
985           is set when the KeepClient is initialized.
986         """
987         if ',' in loc_s:
988             return ''.join(self.get(x) for x in loc_s.split(','))
989
990         self.get_counter.add(1)
991
992         locator = KeepLocator(loc_s)
993         if method == "GET":
994             slot, first = self.block_cache.reserve_cache(locator.md5sum)
995             if not first:
996                 self.hits_counter.add(1)
997                 v = slot.get()
998                 return v
999
1000         self.misses_counter.add(1)
1001
1002         # If the locator has hints specifying a prefix (indicating a
1003         # remote keepproxy) or the UUID of a local gateway service,
1004         # read data from the indicated service(s) instead of the usual
1005         # list of local disk services.
1006         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1007                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1008         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1009                            for hint in locator.hints if (
1010                                    hint.startswith('K@') and
1011                                    len(hint) == 29 and
1012                                    self._gateway_services.get(hint[2:])
1013                                    )])
1014         # Map root URLs to their KeepService objects.
1015         roots_map = {
1016             root: self.KeepService(root, self._user_agent_pool,
1017                                    upload_counter=self.upload_counter,
1018                                    download_counter=self.download_counter)
1019             for root in hint_roots
1020         }
1021
1022         # See #3147 for a discussion of the loop implementation.  Highlights:
1023         # * Refresh the list of Keep services after each failure, in case
1024         #   it's being updated.
1025         # * Retry until we succeed, we're out of retries, or every available
1026         #   service has returned permanent failure.
1027         sorted_roots = []
1028         roots_map = {}
1029         blob = None
1030         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1031                                backoff_start=2)
1032         for tries_left in loop:
1033             try:
1034                 sorted_roots = self.map_new_services(
1035                     roots_map, locator,
1036                     force_rebuild=(tries_left < num_retries),
1037                     need_writable=False)
1038             except Exception as error:
1039                 loop.save_result(error)
1040                 continue
1041
1042             # Query KeepService objects that haven't returned
1043             # permanent failure, in our specified shuffle order.
1044             services_to_try = [roots_map[root]
1045                                for root in sorted_roots
1046                                if roots_map[root].usable()]
1047             for keep_service in services_to_try:
1048                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1049                 if blob is not None:
1050                     break
1051             loop.save_result((blob, len(services_to_try)))
1052
1053         # Always cache the result, then return it if we succeeded.
1054         if method == "GET":
1055             slot.set(blob)
1056             self.block_cache.cap_cache()
1057         if loop.success():
1058             if method == "HEAD":
1059                 return True
1060             else:
1061                 return blob
1062
1063         # Q: Including 403 is necessary for the Keep tests to continue
1064         # passing, but maybe they should expect KeepReadError instead?
1065         not_founds = sum(1 for key in sorted_roots
1066                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1067         service_errors = ((key, roots_map[key].last_result()['error'])
1068                           for key in sorted_roots)
1069         if not roots_map:
1070             raise arvados.errors.KeepReadError(
1071                 "failed to read {}: no Keep services available ({})".format(
1072                     loc_s, loop.last_result()))
1073         elif not_founds == len(sorted_roots):
1074             raise arvados.errors.NotFoundError(
1075                 "{} not found".format(loc_s), service_errors)
1076         else:
1077             raise arvados.errors.KeepReadError(
1078                 "failed to read {}".format(loc_s), service_errors, label="service")
1079
1080     @retry.retry_method
1081     def put(self, data, copies=2, num_retries=None):
1082         """Save data in Keep.
1083
1084         This method will get a list of Keep services from the API server, and
1085         send the data to each one simultaneously in a new thread.  Once the
1086         uploads are finished, if enough copies are saved, this method returns
1087         the most recent HTTP response body.  If requests fail to upload
1088         enough copies, this method raises KeepWriteError.
1089
1090         Arguments:
1091         * data: The string of data to upload.
1092         * copies: The number of copies that the user requires be saved.
1093           Default 2.
1094         * num_retries: The number of times to retry PUT requests to
1095           *each* Keep server if it returns temporary failures, with
1096           exponential backoff.  The default value is set when the
1097           KeepClient is initialized.
1098         """
1099
1100         if not isinstance(data, bytes):
1101             data = data.encode()
1102
1103         self.put_counter.add(1)
1104
1105         data_hash = hashlib.md5(data).hexdigest()
1106         loc_s = data_hash + '+' + str(len(data))
1107         if copies < 1:
1108             return loc_s
1109         locator = KeepLocator(loc_s)
1110
1111         headers = {}
1112         # Tell the proxy how many copies we want it to store
1113         headers['X-Keep-Desired-Replicas'] = str(copies)
1114         roots_map = {}
1115         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1116                                backoff_start=2)
1117         done = 0
1118         for tries_left in loop:
1119             try:
1120                 sorted_roots = self.map_new_services(
1121                     roots_map, locator,
1122                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1123             except Exception as error:
1124                 loop.save_result(error)
1125                 continue
1126
1127             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1128                                                         data_hash=data_hash,
1129                                                         copies=copies - done,
1130                                                         max_service_replicas=self.max_replicas_per_service,
1131                                                         timeout=self.current_timeout(num_retries - tries_left))
1132             for service_root, ks in [(root, roots_map[root])
1133                                      for root in sorted_roots]:
1134                 if ks.finished():
1135                     continue
1136                 writer_pool.add_task(ks, service_root)
1137             writer_pool.join()
1138             done += writer_pool.done()
1139             loop.save_result((done >= copies, writer_pool.total_task_nr))
1140
1141         if loop.success():
1142             return writer_pool.response()
1143         if not roots_map:
1144             raise arvados.errors.KeepWriteError(
1145                 "failed to write {}: no Keep services available ({})".format(
1146                     data_hash, loop.last_result()))
1147         else:
1148             service_errors = ((key, roots_map[key].last_result()['error'])
1149                               for key in sorted_roots
1150                               if roots_map[key].last_result()['error'])
1151             raise arvados.errors.KeepWriteError(
1152                 "failed to write {} (wanted {} copies but wrote {})".format(
1153                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1154
1155     def local_store_put(self, data, copies=1, num_retries=None):
1156         """A stub for put().
1157
1158         This method is used in place of the real put() method when
1159         using local storage (see constructor's local_store argument).
1160
1161         copies and num_retries arguments are ignored: they are here
1162         only for the sake of offering the same call signature as
1163         put().
1164
1165         Data stored this way can be retrieved via local_store_get().
1166         """
1167         md5 = hashlib.md5(data).hexdigest()
1168         locator = '%s+%d' % (md5, len(data))
1169         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1170             f.write(data)
1171         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1172                   os.path.join(self.local_store, md5))
1173         return locator
1174
1175     def local_store_get(self, loc_s, num_retries=None):
1176         """Companion to local_store_put()."""
1177         try:
1178             locator = KeepLocator(loc_s)
1179         except ValueError:
1180             raise arvados.errors.NotFoundError(
1181                 "Invalid data locator: '%s'" % loc_s)
1182         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1183             return b''
1184         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1185             return f.read()
1186
1187     def is_cached(self, locator):
1188         return self.block_cache.reserve_cache(expect_hash)