12055: Merge branch 'master' into 12055-nodemanager-ec2-tags
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import native_str
9 standard_library.install_aliases()
10 from builtins import next
11 from builtins import str
12 from builtins import range
13 from builtins import object
14 import collections
15 import datetime
16 import hashlib
17 import io
18 import logging
19 import math
20 import os
21 import pycurl
22 import queue
23 import re
24 import socket
25 import ssl
26 import sys
27 import threading
28 from . import timer
29 import urllib.parse
30
31 if sys.version_info >= (3, 0):
32     from io import BytesIO
33 else:
34     from cStringIO import StringIO as BytesIO
35
36 import arvados
37 import arvados.config as config
38 import arvados.errors
39 import arvados.retry as retry
40 import arvados.util
41
42 _logger = logging.getLogger('arvados.keep')
43 global_client_object = None
44
45
46 # Monkey patch TCP constants when not available (apple). Values sourced from:
47 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
48 if sys.platform == 'darwin':
49     if not hasattr(socket, 'TCP_KEEPALIVE'):
50         socket.TCP_KEEPALIVE = 0x010
51     if not hasattr(socket, 'TCP_KEEPINTVL'):
52         socket.TCP_KEEPINTVL = 0x101
53     if not hasattr(socket, 'TCP_KEEPCNT'):
54         socket.TCP_KEEPCNT = 0x102
55
56
57 class KeepLocator(object):
58     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
59     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
60
61     def __init__(self, locator_str):
62         self.hints = []
63         self._perm_sig = None
64         self._perm_expiry = None
65         pieces = iter(locator_str.split('+'))
66         self.md5sum = next(pieces)
67         try:
68             self.size = int(next(pieces))
69         except StopIteration:
70             self.size = None
71         for hint in pieces:
72             if self.HINT_RE.match(hint) is None:
73                 raise ValueError("invalid hint format: {}".format(hint))
74             elif hint.startswith('A'):
75                 self.parse_permission_hint(hint)
76             else:
77                 self.hints.append(hint)
78
79     def __str__(self):
80         return '+'.join(
81             native_str(s)
82             for s in [self.md5sum, self.size,
83                       self.permission_hint()] + self.hints
84             if s is not None)
85
86     def stripped(self):
87         if self.size is not None:
88             return "%s+%i" % (self.md5sum, self.size)
89         else:
90             return self.md5sum
91
92     def _make_hex_prop(name, length):
93         # Build and return a new property with the given name that
94         # must be a hex string of the given length.
95         data_name = '_{}'.format(name)
96         def getter(self):
97             return getattr(self, data_name)
98         def setter(self, hex_str):
99             if not arvados.util.is_hex(hex_str, length):
100                 raise ValueError("{} is not a {}-digit hex string: {!r}".
101                                  format(name, length, hex_str))
102             setattr(self, data_name, hex_str)
103         return property(getter, setter)
104
105     md5sum = _make_hex_prop('md5sum', 32)
106     perm_sig = _make_hex_prop('perm_sig', 40)
107
108     @property
109     def perm_expiry(self):
110         return self._perm_expiry
111
112     @perm_expiry.setter
113     def perm_expiry(self, value):
114         if not arvados.util.is_hex(value, 1, 8):
115             raise ValueError(
116                 "permission timestamp must be a hex Unix timestamp: {}".
117                 format(value))
118         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
119
120     def permission_hint(self):
121         data = [self.perm_sig, self.perm_expiry]
122         if None in data:
123             return None
124         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
125         return "A{}@{:08x}".format(*data)
126
127     def parse_permission_hint(self, s):
128         try:
129             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
130         except IndexError:
131             raise ValueError("bad permission hint {}".format(s))
132
133     def permission_expired(self, as_of_dt=None):
134         if self.perm_expiry is None:
135             return False
136         elif as_of_dt is None:
137             as_of_dt = datetime.datetime.now()
138         return self.perm_expiry <= as_of_dt
139
140
141 class Keep(object):
142     """Simple interface to a global KeepClient object.
143
144     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
145     own API client.  The global KeepClient will build an API client from the
146     current Arvados configuration, which may not match the one you built.
147     """
148     _last_key = None
149
150     @classmethod
151     def global_client_object(cls):
152         global global_client_object
153         # Previously, KeepClient would change its behavior at runtime based
154         # on these configuration settings.  We simulate that behavior here
155         # by checking the values and returning a new KeepClient if any of
156         # them have changed.
157         key = (config.get('ARVADOS_API_HOST'),
158                config.get('ARVADOS_API_TOKEN'),
159                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
160                config.get('ARVADOS_KEEP_PROXY'),
161                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
162                os.environ.get('KEEP_LOCAL_STORE'))
163         if (global_client_object is None) or (cls._last_key != key):
164             global_client_object = KeepClient()
165             cls._last_key = key
166         return global_client_object
167
168     @staticmethod
169     def get(locator, **kwargs):
170         return Keep.global_client_object().get(locator, **kwargs)
171
172     @staticmethod
173     def put(data, **kwargs):
174         return Keep.global_client_object().put(data, **kwargs)
175
176 class KeepBlockCache(object):
177     # Default RAM cache is 256MiB
178     def __init__(self, cache_max=(256 * 1024 * 1024)):
179         self.cache_max = cache_max
180         self._cache = []
181         self._cache_lock = threading.Lock()
182
183     class CacheSlot(object):
184         __slots__ = ("locator", "ready", "content")
185
186         def __init__(self, locator):
187             self.locator = locator
188             self.ready = threading.Event()
189             self.content = None
190
191         def get(self):
192             self.ready.wait()
193             return self.content
194
195         def set(self, value):
196             self.content = value
197             self.ready.set()
198
199         def size(self):
200             if self.content is None:
201                 return 0
202             else:
203                 return len(self.content)
204
205     def cap_cache(self):
206         '''Cap the cache size to self.cache_max'''
207         with self._cache_lock:
208             # Select all slots except those where ready.is_set() and content is
209             # None (that means there was an error reading the block).
210             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211             sm = sum([slot.size() for slot in self._cache])
212             while len(self._cache) > 0 and sm > self.cache_max:
213                 for i in range(len(self._cache)-1, -1, -1):
214                     if self._cache[i].ready.is_set():
215                         del self._cache[i]
216                         break
217                 sm = sum([slot.size() for slot in self._cache])
218
219     def _get(self, locator):
220         # Test if the locator is already in the cache
221         for i in range(0, len(self._cache)):
222             if self._cache[i].locator == locator:
223                 n = self._cache[i]
224                 if i != 0:
225                     # move it to the front
226                     del self._cache[i]
227                     self._cache.insert(0, n)
228                 return n
229         return None
230
231     def get(self, locator):
232         with self._cache_lock:
233             return self._get(locator)
234
235     def reserve_cache(self, locator):
236         '''Reserve a cache slot for the specified locator,
237         or return the existing slot.'''
238         with self._cache_lock:
239             n = self._get(locator)
240             if n:
241                 return n, False
242             else:
243                 # Add a new cache slot for the locator
244                 n = KeepBlockCache.CacheSlot(locator)
245                 self._cache.insert(0, n)
246                 return n, True
247
248 class Counter(object):
249     def __init__(self, v=0):
250         self._lk = threading.Lock()
251         self._val = v
252
253     def add(self, v):
254         with self._lk:
255             self._val += v
256
257     def get(self):
258         with self._lk:
259             return self._val
260
261
262 class KeepClient(object):
263
264     # Default Keep server connection timeout:  2 seconds
265     # Default Keep server read timeout:       256 seconds
266     # Default Keep server bandwidth minimum:  32768 bytes per second
267     # Default Keep proxy connection timeout:  20 seconds
268     # Default Keep proxy read timeout:        256 seconds
269     # Default Keep proxy bandwidth minimum:   32768 bytes per second
270     DEFAULT_TIMEOUT = (2, 256, 32768)
271     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
272
273
274     class KeepService(object):
275         """Make requests to a single Keep service, and track results.
276
277         A KeepService is intended to last long enough to perform one
278         transaction (GET or PUT) against one Keep service. This can
279         involve calling either get() or put() multiple times in order
280         to retry after transient failures. However, calling both get()
281         and put() on a single instance -- or using the same instance
282         to access two different Keep services -- will not produce
283         sensible behavior.
284         """
285
286         HTTP_ERRORS = (
287             socket.error,
288             ssl.SSLError,
289             arvados.errors.HttpError,
290         )
291
292         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
293                      upload_counter=None,
294                      download_counter=None, **headers):
295             self.root = root
296             self._user_agent_pool = user_agent_pool
297             self._result = {'error': None}
298             self._usable = True
299             self._session = None
300             self._socket = None
301             self.get_headers = {'Accept': 'application/octet-stream'}
302             self.get_headers.update(headers)
303             self.put_headers = headers
304             self.upload_counter = upload_counter
305             self.download_counter = download_counter
306
307         def usable(self):
308             """Is it worth attempting a request?"""
309             return self._usable
310
311         def finished(self):
312             """Did the request succeed or encounter permanent failure?"""
313             return self._result['error'] == False or not self._usable
314
315         def last_result(self):
316             return self._result
317
318         def _get_user_agent(self):
319             try:
320                 return self._user_agent_pool.get(block=False)
321             except queue.Empty:
322                 return pycurl.Curl()
323
324         def _put_user_agent(self, ua):
325             try:
326                 ua.reset()
327                 self._user_agent_pool.put(ua, block=False)
328             except:
329                 ua.close()
330
331         def _socket_open(self, *args, **kwargs):
332             if len(args) + len(kwargs) == 2:
333                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
334             else:
335                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
336
337         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
338             return self._socket_open_pycurl_7_21_5(
339                 purpose=None,
340                 address=collections.namedtuple(
341                     'Address', ['family', 'socktype', 'protocol', 'addr'],
342                 )(family, socktype, protocol, address))
343
344         def _socket_open_pycurl_7_21_5(self, purpose, address):
345             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
346             s = socket.socket(address.family, address.socktype, address.protocol)
347             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
348             # Will throw invalid protocol error on mac. This test prevents that.
349             if hasattr(socket, 'TCP_KEEPIDLE'):
350                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
351             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
352             self._socket = s
353             return s
354
355         def get(self, locator, method="GET", timeout=None):
356             # locator is a KeepLocator object.
357             url = self.root + str(locator)
358             _logger.debug("Request: %s %s", method, url)
359             curl = self._get_user_agent()
360             ok = None
361             try:
362                 with timer.Timer() as t:
363                     self._headers = {}
364                     response_body = BytesIO()
365                     curl.setopt(pycurl.NOSIGNAL, 1)
366                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
367                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
368                     curl.setopt(pycurl.URL, url.encode('utf-8'))
369                     curl.setopt(pycurl.HTTPHEADER, [
370                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
371                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
372                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
373                     if method == "HEAD":
374                         curl.setopt(pycurl.NOBODY, True)
375                     self._setcurltimeouts(curl, timeout)
376
377                     try:
378                         curl.perform()
379                     except Exception as e:
380                         raise arvados.errors.HttpError(0, str(e))
381                     finally:
382                         if self._socket:
383                             self._socket.close()
384                             self._socket = None
385                     self._result = {
386                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
387                         'body': response_body.getvalue(),
388                         'headers': self._headers,
389                         'error': False,
390                     }
391
392                 ok = retry.check_http_response_success(self._result['status_code'])
393                 if not ok:
394                     self._result['error'] = arvados.errors.HttpError(
395                         self._result['status_code'],
396                         self._headers.get('x-status-line', 'Error'))
397             except self.HTTP_ERRORS as e:
398                 self._result = {
399                     'error': e,
400                 }
401             self._usable = ok != False
402             if self._result.get('status_code', None):
403                 # The client worked well enough to get an HTTP status
404                 # code, so presumably any problems are just on the
405                 # server side and it's OK to reuse the client.
406                 self._put_user_agent(curl)
407             else:
408                 # Don't return this client to the pool, in case it's
409                 # broken.
410                 curl.close()
411             if not ok:
412                 _logger.debug("Request fail: GET %s => %s: %s",
413                               url, type(self._result['error']), str(self._result['error']))
414                 return None
415             if method == "HEAD":
416                 _logger.info("HEAD %s: %s bytes",
417                          self._result['status_code'],
418                          self._result.get('content-length'))
419                 return True
420
421             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
422                          self._result['status_code'],
423                          len(self._result['body']),
424                          t.msecs,
425                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
426
427             if self.download_counter:
428                 self.download_counter.add(len(self._result['body']))
429             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
430             if resp_md5 != locator.md5sum:
431                 _logger.warning("Checksum fail: md5(%s) = %s",
432                                 url, resp_md5)
433                 self._result['error'] = arvados.errors.HttpError(
434                     0, 'Checksum fail')
435                 return None
436             return self._result['body']
437
438         def put(self, hash_s, body, timeout=None):
439             url = self.root + hash_s
440             _logger.debug("Request: PUT %s", url)
441             curl = self._get_user_agent()
442             ok = None
443             try:
444                 with timer.Timer() as t:
445                     self._headers = {}
446                     body_reader = BytesIO(body)
447                     response_body = BytesIO()
448                     curl.setopt(pycurl.NOSIGNAL, 1)
449                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
450                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
451                     curl.setopt(pycurl.URL, url.encode('utf-8'))
452                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
453                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
454                     # response) instead of sending the request body immediately.
455                     # This allows the server to reject the request if the request
456                     # is invalid or the server is read-only, without waiting for
457                     # the client to send the entire block.
458                     curl.setopt(pycurl.UPLOAD, True)
459                     curl.setopt(pycurl.INFILESIZE, len(body))
460                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
461                     curl.setopt(pycurl.HTTPHEADER, [
462                         '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
463                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
464                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
465                     self._setcurltimeouts(curl, timeout)
466                     try:
467                         curl.perform()
468                     except Exception as e:
469                         raise arvados.errors.HttpError(0, str(e))
470                     finally:
471                         if self._socket:
472                             self._socket.close()
473                             self._socket = None
474                     self._result = {
475                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
476                         'body': response_body.getvalue().decode('utf-8'),
477                         'headers': self._headers,
478                         'error': False,
479                     }
480                 ok = retry.check_http_response_success(self._result['status_code'])
481                 if not ok:
482                     self._result['error'] = arvados.errors.HttpError(
483                         self._result['status_code'],
484                         self._headers.get('x-status-line', 'Error'))
485             except self.HTTP_ERRORS as e:
486                 self._result = {
487                     'error': e,
488                 }
489             self._usable = ok != False # still usable if ok is True or None
490             if self._result.get('status_code', None):
491                 # Client is functional. See comment in get().
492                 self._put_user_agent(curl)
493             else:
494                 curl.close()
495             if not ok:
496                 _logger.debug("Request fail: PUT %s => %s: %s",
497                               url, type(self._result['error']), str(self._result['error']))
498                 return False
499             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
500                          self._result['status_code'],
501                          len(body),
502                          t.msecs,
503                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
504             if self.upload_counter:
505                 self.upload_counter.add(len(body))
506             return True
507
508         def _setcurltimeouts(self, curl, timeouts):
509             if not timeouts:
510                 return
511             elif isinstance(timeouts, tuple):
512                 if len(timeouts) == 2:
513                     conn_t, xfer_t = timeouts
514                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
515                 else:
516                     conn_t, xfer_t, bandwidth_bps = timeouts
517             else:
518                 conn_t, xfer_t = (timeouts, timeouts)
519                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
520             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
521             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
522             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
523
524         def _headerfunction(self, header_line):
525             if isinstance(header_line, bytes):
526                 header_line = header_line.decode('iso-8859-1')
527             if ':' in header_line:
528                 name, value = header_line.split(':', 1)
529                 name = name.strip().lower()
530                 value = value.strip()
531             elif self._headers:
532                 name = self._lastheadername
533                 value = self._headers[name] + ' ' + header_line.strip()
534             elif header_line.startswith('HTTP/'):
535                 name = 'x-status-line'
536                 value = header_line
537             else:
538                 _logger.error("Unexpected header line: %s", header_line)
539                 return
540             self._lastheadername = name
541             self._headers[name] = value
542             # Returning None implies all bytes were written
543     
544
545     class KeepWriterQueue(queue.Queue):
546         def __init__(self, copies):
547             queue.Queue.__init__(self) # Old-style superclass
548             self.wanted_copies = copies
549             self.successful_copies = 0
550             self.response = None
551             self.successful_copies_lock = threading.Lock()
552             self.pending_tries = copies
553             self.pending_tries_notification = threading.Condition()
554         
555         def write_success(self, response, replicas_nr):
556             with self.successful_copies_lock:
557                 self.successful_copies += replicas_nr
558                 self.response = response
559             with self.pending_tries_notification:
560                 self.pending_tries_notification.notify_all()
561         
562         def write_fail(self, ks):
563             with self.pending_tries_notification:
564                 self.pending_tries += 1
565                 self.pending_tries_notification.notify()
566         
567         def pending_copies(self):
568             with self.successful_copies_lock:
569                 return self.wanted_copies - self.successful_copies
570
571         def get_next_task(self):
572             with self.pending_tries_notification:
573                 while True:
574                     if self.pending_copies() < 1:
575                         # This notify_all() is unnecessary --
576                         # write_success() already called notify_all()
577                         # when pending<1 became true, so it's not
578                         # possible for any other thread to be in
579                         # wait() now -- but it's cheap insurance
580                         # against deadlock so we do it anyway:
581                         self.pending_tries_notification.notify_all()
582                         # Drain the queue and then raise Queue.Empty
583                         while True:
584                             self.get_nowait()
585                             self.task_done()
586                     elif self.pending_tries > 0:
587                         service, service_root = self.get_nowait()
588                         if service.finished():
589                             self.task_done()
590                             continue
591                         self.pending_tries -= 1
592                         return service, service_root
593                     elif self.empty():
594                         self.pending_tries_notification.notify_all()
595                         raise queue.Empty
596                     else:
597                         self.pending_tries_notification.wait()
598
599
600     class KeepWriterThreadPool(object):
601         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
602             self.total_task_nr = 0
603             self.wanted_copies = copies
604             if (not max_service_replicas) or (max_service_replicas >= copies):
605                 num_threads = 1
606             else:
607                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
608             _logger.debug("Pool max threads is %d", num_threads)
609             self.workers = []
610             self.queue = KeepClient.KeepWriterQueue(copies)
611             # Create workers
612             for _ in range(num_threads):
613                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
614                 self.workers.append(w)
615         
616         def add_task(self, ks, service_root):
617             self.queue.put((ks, service_root))
618             self.total_task_nr += 1
619         
620         def done(self):
621             return self.queue.successful_copies
622         
623         def join(self):
624             # Start workers
625             for worker in self.workers:
626                 worker.start()
627             # Wait for finished work
628             self.queue.join()
629         
630         def response(self):
631             return self.queue.response
632     
633     
634     class KeepWriterThread(threading.Thread):
635         TaskFailed = RuntimeError()
636
637         def __init__(self, queue, data, data_hash, timeout=None):
638             super(KeepClient.KeepWriterThread, self).__init__()
639             self.timeout = timeout
640             self.queue = queue
641             self.data = data
642             self.data_hash = data_hash
643             self.daemon = True
644
645         def run(self):
646             while True:
647                 try:
648                     service, service_root = self.queue.get_next_task()
649                 except queue.Empty:
650                     return
651                 try:
652                     locator, copies = self.do_task(service, service_root)
653                 except Exception as e:
654                     if e is not self.TaskFailed:
655                         _logger.exception("Exception in KeepWriterThread")
656                     self.queue.write_fail(service)
657                 else:
658                     self.queue.write_success(locator, copies)
659                 finally:
660                     self.queue.task_done()
661
662         def do_task(self, service, service_root):
663             success = bool(service.put(self.data_hash,
664                                         self.data,
665                                         timeout=self.timeout))
666             result = service.last_result()
667
668             if not success:
669                 if result.get('status_code', None):
670                     _logger.debug("Request fail: PUT %s => %s %s",
671                                   self.data_hash,
672                                   result['status_code'],
673                                   result['body'])
674                 raise self.TaskFailed
675
676             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
677                           str(threading.current_thread()),
678                           self.data_hash,
679                           len(self.data),
680                           service_root)
681             try:
682                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
683             except (KeyError, ValueError):
684                 replicas_stored = 1
685
686             return result['body'].strip(), replicas_stored
687
688
689     def __init__(self, api_client=None, proxy=None,
690                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
691                  api_token=None, local_store=None, block_cache=None,
692                  num_retries=0, session=None):
693         """Initialize a new KeepClient.
694
695         Arguments:
696         :api_client:
697           The API client to use to find Keep services.  If not
698           provided, KeepClient will build one from available Arvados
699           configuration.
700
701         :proxy:
702           If specified, this KeepClient will send requests to this Keep
703           proxy.  Otherwise, KeepClient will fall back to the setting of the
704           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
705           If you want to KeepClient does not use a proxy, pass in an empty
706           string.
707
708         :timeout:
709           The initial timeout (in seconds) for HTTP requests to Keep
710           non-proxy servers.  A tuple of three floats is interpreted as
711           (connection_timeout, read_timeout, minimum_bandwidth). A connection
712           will be aborted if the average traffic rate falls below
713           minimum_bandwidth bytes per second over an interval of read_timeout
714           seconds. Because timeouts are often a result of transient server
715           load, the actual connection timeout will be increased by a factor
716           of two on each retry.
717           Default: (2, 256, 32768).
718
719         :proxy_timeout:
720           The initial timeout (in seconds) for HTTP requests to
721           Keep proxies. A tuple of three floats is interpreted as
722           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
723           described above for adjusting connection timeouts on retry also
724           applies.
725           Default: (20, 256, 32768).
726
727         :api_token:
728           If you're not using an API client, but only talking
729           directly to a Keep proxy, this parameter specifies an API token
730           to authenticate Keep requests.  It is an error to specify both
731           api_client and api_token.  If you specify neither, KeepClient
732           will use one available from the Arvados configuration.
733
734         :local_store:
735           If specified, this KeepClient will bypass Keep
736           services, and save data to the named directory.  If unspecified,
737           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
738           environment variable.  If you want to ensure KeepClient does not
739           use local storage, pass in an empty string.  This is primarily
740           intended to mock a server for testing.
741
742         :num_retries:
743           The default number of times to retry failed requests.
744           This will be used as the default num_retries value when get() and
745           put() are called.  Default 0.
746         """
747         self.lock = threading.Lock()
748         if proxy is None:
749             if config.get('ARVADOS_KEEP_SERVICES'):
750                 proxy = config.get('ARVADOS_KEEP_SERVICES')
751             else:
752                 proxy = config.get('ARVADOS_KEEP_PROXY')
753         if api_token is None:
754             if api_client is None:
755                 api_token = config.get('ARVADOS_API_TOKEN')
756             else:
757                 api_token = api_client.api_token
758         elif api_client is not None:
759             raise ValueError(
760                 "can't build KeepClient with both API client and token")
761         if local_store is None:
762             local_store = os.environ.get('KEEP_LOCAL_STORE')
763
764         self.block_cache = block_cache if block_cache else KeepBlockCache()
765         self.timeout = timeout
766         self.proxy_timeout = proxy_timeout
767         self._user_agent_pool = queue.LifoQueue()
768         self.upload_counter = Counter()
769         self.download_counter = Counter()
770         self.put_counter = Counter()
771         self.get_counter = Counter()
772         self.hits_counter = Counter()
773         self.misses_counter = Counter()
774
775         if local_store:
776             self.local_store = local_store
777             self.get = self.local_store_get
778             self.put = self.local_store_put
779         else:
780             self.num_retries = num_retries
781             self.max_replicas_per_service = None
782             if proxy:
783                 proxy_uris = proxy.split()
784                 for i in range(len(proxy_uris)):
785                     if not proxy_uris[i].endswith('/'):
786                         proxy_uris[i] += '/'
787                     # URL validation
788                     url = urllib.parse.urlparse(proxy_uris[i])
789                     if not (url.scheme and url.netloc):
790                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
791                 self.api_token = api_token
792                 self._gateway_services = {}
793                 self._keep_services = [{
794                     'uuid': "00000-bi6l4-%015d" % idx,
795                     'service_type': 'proxy',
796                     '_service_root': uri,
797                     } for idx, uri in enumerate(proxy_uris)]
798                 self._writable_services = self._keep_services
799                 self.using_proxy = True
800                 self._static_services_list = True
801             else:
802                 # It's important to avoid instantiating an API client
803                 # unless we actually need one, for testing's sake.
804                 if api_client is None:
805                     api_client = arvados.api('v1')
806                 self.api_client = api_client
807                 self.api_token = api_client.api_token
808                 self._gateway_services = {}
809                 self._keep_services = None
810                 self._writable_services = None
811                 self.using_proxy = None
812                 self._static_services_list = False
813
814     def current_timeout(self, attempt_number):
815         """Return the appropriate timeout to use for this client.
816
817         The proxy timeout setting if the backend service is currently a proxy,
818         the regular timeout setting otherwise.  The `attempt_number` indicates
819         how many times the operation has been tried already (starting from 0
820         for the first try), and scales the connection timeout portion of the
821         return value accordingly.
822
823         """
824         # TODO(twp): the timeout should be a property of a
825         # KeepService, not a KeepClient. See #4488.
826         t = self.proxy_timeout if self.using_proxy else self.timeout
827         if len(t) == 2:
828             return (t[0] * (1 << attempt_number), t[1])
829         else:
830             return (t[0] * (1 << attempt_number), t[1], t[2])
831     def _any_nondisk_services(self, service_list):
832         return any(ks.get('service_type', 'disk') != 'disk'
833                    for ks in service_list)
834
835     def build_services_list(self, force_rebuild=False):
836         if (self._static_services_list or
837               (self._keep_services and not force_rebuild)):
838             return
839         with self.lock:
840             try:
841                 keep_services = self.api_client.keep_services().accessible()
842             except Exception:  # API server predates Keep services.
843                 keep_services = self.api_client.keep_disks().list()
844
845             # Gateway services are only used when specified by UUID,
846             # so there's nothing to gain by filtering them by
847             # service_type.
848             self._gateway_services = {ks['uuid']: ks for ks in
849                                       keep_services.execute()['items']}
850             if not self._gateway_services:
851                 raise arvados.errors.NoKeepServersError()
852
853             # Precompute the base URI for each service.
854             for r in self._gateway_services.values():
855                 host = r['service_host']
856                 if not host.startswith('[') and host.find(':') >= 0:
857                     # IPv6 URIs must be formatted like http://[::1]:80/...
858                     host = '[' + host + ']'
859                 r['_service_root'] = "{}://{}:{:d}/".format(
860                     'https' if r['service_ssl_flag'] else 'http',
861                     host,
862                     r['service_port'])
863
864             _logger.debug(str(self._gateway_services))
865             self._keep_services = [
866                 ks for ks in self._gateway_services.values()
867                 if not ks.get('service_type', '').startswith('gateway:')]
868             self._writable_services = [ks for ks in self._keep_services
869                                        if not ks.get('read_only')]
870
871             # For disk type services, max_replicas_per_service is 1
872             # It is unknown (unlimited) for other service types.
873             if self._any_nondisk_services(self._writable_services):
874                 self.max_replicas_per_service = None
875             else:
876                 self.max_replicas_per_service = 1
877
878     def _service_weight(self, data_hash, service_uuid):
879         """Compute the weight of a Keep service endpoint for a data
880         block with a known hash.
881
882         The weight is md5(h + u) where u is the last 15 characters of
883         the service endpoint's UUID.
884         """
885         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
886
887     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
888         """Return an array of Keep service endpoints, in the order in
889         which they should be probed when reading or writing data with
890         the given hash+hints.
891         """
892         self.build_services_list(force_rebuild)
893
894         sorted_roots = []
895         # Use the services indicated by the given +K@... remote
896         # service hints, if any are present and can be resolved to a
897         # URI.
898         for hint in locator.hints:
899             if hint.startswith('K@'):
900                 if len(hint) == 7:
901                     sorted_roots.append(
902                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
903                 elif len(hint) == 29:
904                     svc = self._gateway_services.get(hint[2:])
905                     if svc:
906                         sorted_roots.append(svc['_service_root'])
907
908         # Sort the available local services by weight (heaviest first)
909         # for this locator, and return their service_roots (base URIs)
910         # in that order.
911         use_services = self._keep_services
912         if need_writable:
913             use_services = self._writable_services
914         self.using_proxy = self._any_nondisk_services(use_services)
915         sorted_roots.extend([
916             svc['_service_root'] for svc in sorted(
917                 use_services,
918                 reverse=True,
919                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
920         _logger.debug("{}: {}".format(locator, sorted_roots))
921         return sorted_roots
922
923     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
924         # roots_map is a dictionary, mapping Keep service root strings
925         # to KeepService objects.  Poll for Keep services, and add any
926         # new ones to roots_map.  Return the current list of local
927         # root strings.
928         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
929         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
930         for root in local_roots:
931             if root not in roots_map:
932                 roots_map[root] = self.KeepService(
933                     root, self._user_agent_pool,
934                     upload_counter=self.upload_counter,
935                     download_counter=self.download_counter,
936                     **headers)
937         return local_roots
938
939     @staticmethod
940     def _check_loop_result(result):
941         # KeepClient RetryLoops should save results as a 2-tuple: the
942         # actual result of the request, and the number of servers available
943         # to receive the request this round.
944         # This method returns True if there's a real result, False if
945         # there are no more servers available, otherwise None.
946         if isinstance(result, Exception):
947             return None
948         result, tried_server_count = result
949         if (result is not None) and (result is not False):
950             return True
951         elif tried_server_count < 1:
952             _logger.info("No more Keep services to try; giving up")
953             return False
954         else:
955             return None
956
957     def get_from_cache(self, loc):
958         """Fetch a block only if is in the cache, otherwise return None."""
959         slot = self.block_cache.get(loc)
960         if slot is not None and slot.ready.is_set():
961             return slot.get()
962         else:
963             return None
964
965     @retry.retry_method
966     def head(self, loc_s, num_retries=None):
967         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
968
969     @retry.retry_method
970     def get(self, loc_s, num_retries=None):
971         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
972
973     def _get_or_head(self, loc_s, method="GET", num_retries=None):
974         """Get data from Keep.
975
976         This method fetches one or more blocks of data from Keep.  It
977         sends a request each Keep service registered with the API
978         server (or the proxy provided when this client was
979         instantiated), then each service named in location hints, in
980         sequence.  As soon as one service provides the data, it's
981         returned.
982
983         Arguments:
984         * loc_s: A string of one or more comma-separated locators to fetch.
985           This method returns the concatenation of these blocks.
986         * num_retries: The number of times to retry GET requests to
987           *each* Keep server if it returns temporary failures, with
988           exponential backoff.  Note that, in each loop, the method may try
989           to fetch data from every available Keep service, along with any
990           that are named in location hints in the locator.  The default value
991           is set when the KeepClient is initialized.
992         """
993         if ',' in loc_s:
994             return ''.join(self.get(x) for x in loc_s.split(','))
995
996         self.get_counter.add(1)
997
998         locator = KeepLocator(loc_s)
999         if method == "GET":
1000             slot, first = self.block_cache.reserve_cache(locator.md5sum)
1001             if not first:
1002                 self.hits_counter.add(1)
1003                 v = slot.get()
1004                 return v
1005
1006         self.misses_counter.add(1)
1007
1008         # If the locator has hints specifying a prefix (indicating a
1009         # remote keepproxy) or the UUID of a local gateway service,
1010         # read data from the indicated service(s) instead of the usual
1011         # list of local disk services.
1012         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1013                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1014         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1015                            for hint in locator.hints if (
1016                                    hint.startswith('K@') and
1017                                    len(hint) == 29 and
1018                                    self._gateway_services.get(hint[2:])
1019                                    )])
1020         # Map root URLs to their KeepService objects.
1021         roots_map = {
1022             root: self.KeepService(root, self._user_agent_pool,
1023                                    upload_counter=self.upload_counter,
1024                                    download_counter=self.download_counter)
1025             for root in hint_roots
1026         }
1027
1028         # See #3147 for a discussion of the loop implementation.  Highlights:
1029         # * Refresh the list of Keep services after each failure, in case
1030         #   it's being updated.
1031         # * Retry until we succeed, we're out of retries, or every available
1032         #   service has returned permanent failure.
1033         sorted_roots = []
1034         roots_map = {}
1035         blob = None
1036         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1037                                backoff_start=2)
1038         for tries_left in loop:
1039             try:
1040                 sorted_roots = self.map_new_services(
1041                     roots_map, locator,
1042                     force_rebuild=(tries_left < num_retries),
1043                     need_writable=False)
1044             except Exception as error:
1045                 loop.save_result(error)
1046                 continue
1047
1048             # Query KeepService objects that haven't returned
1049             # permanent failure, in our specified shuffle order.
1050             services_to_try = [roots_map[root]
1051                                for root in sorted_roots
1052                                if roots_map[root].usable()]
1053             for keep_service in services_to_try:
1054                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1055                 if blob is not None:
1056                     break
1057             loop.save_result((blob, len(services_to_try)))
1058
1059         # Always cache the result, then return it if we succeeded.
1060         if method == "GET":
1061             slot.set(blob)
1062             self.block_cache.cap_cache()
1063         if loop.success():
1064             if method == "HEAD":
1065                 return True
1066             else:
1067                 return blob
1068
1069         # Q: Including 403 is necessary for the Keep tests to continue
1070         # passing, but maybe they should expect KeepReadError instead?
1071         not_founds = sum(1 for key in sorted_roots
1072                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1073         service_errors = ((key, roots_map[key].last_result()['error'])
1074                           for key in sorted_roots)
1075         if not roots_map:
1076             raise arvados.errors.KeepReadError(
1077                 "failed to read {}: no Keep services available ({})".format(
1078                     loc_s, loop.last_result()))
1079         elif not_founds == len(sorted_roots):
1080             raise arvados.errors.NotFoundError(
1081                 "{} not found".format(loc_s), service_errors)
1082         else:
1083             raise arvados.errors.KeepReadError(
1084                 "failed to read {}".format(loc_s), service_errors, label="service")
1085
1086     @retry.retry_method
1087     def put(self, data, copies=2, num_retries=None):
1088         """Save data in Keep.
1089
1090         This method will get a list of Keep services from the API server, and
1091         send the data to each one simultaneously in a new thread.  Once the
1092         uploads are finished, if enough copies are saved, this method returns
1093         the most recent HTTP response body.  If requests fail to upload
1094         enough copies, this method raises KeepWriteError.
1095
1096         Arguments:
1097         * data: The string of data to upload.
1098         * copies: The number of copies that the user requires be saved.
1099           Default 2.
1100         * num_retries: The number of times to retry PUT requests to
1101           *each* Keep server if it returns temporary failures, with
1102           exponential backoff.  The default value is set when the
1103           KeepClient is initialized.
1104         """
1105
1106         if not isinstance(data, bytes):
1107             data = data.encode()
1108
1109         self.put_counter.add(1)
1110
1111         data_hash = hashlib.md5(data).hexdigest()
1112         loc_s = data_hash + '+' + str(len(data))
1113         if copies < 1:
1114             return loc_s
1115         locator = KeepLocator(loc_s)
1116
1117         headers = {}
1118         # Tell the proxy how many copies we want it to store
1119         headers['X-Keep-Desired-Replicas'] = str(copies)
1120         roots_map = {}
1121         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1122                                backoff_start=2)
1123         done = 0
1124         for tries_left in loop:
1125             try:
1126                 sorted_roots = self.map_new_services(
1127                     roots_map, locator,
1128                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1129             except Exception as error:
1130                 loop.save_result(error)
1131                 continue
1132
1133             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1134                                                         data_hash=data_hash,
1135                                                         copies=copies - done,
1136                                                         max_service_replicas=self.max_replicas_per_service,
1137                                                         timeout=self.current_timeout(num_retries - tries_left))
1138             for service_root, ks in [(root, roots_map[root])
1139                                      for root in sorted_roots]:
1140                 if ks.finished():
1141                     continue
1142                 writer_pool.add_task(ks, service_root)
1143             writer_pool.join()
1144             done += writer_pool.done()
1145             loop.save_result((done >= copies, writer_pool.total_task_nr))
1146
1147         if loop.success():
1148             return writer_pool.response()
1149         if not roots_map:
1150             raise arvados.errors.KeepWriteError(
1151                 "failed to write {}: no Keep services available ({})".format(
1152                     data_hash, loop.last_result()))
1153         else:
1154             service_errors = ((key, roots_map[key].last_result()['error'])
1155                               for key in sorted_roots
1156                               if roots_map[key].last_result()['error'])
1157             raise arvados.errors.KeepWriteError(
1158                 "failed to write {} (wanted {} copies but wrote {})".format(
1159                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1160
1161     def local_store_put(self, data, copies=1, num_retries=None):
1162         """A stub for put().
1163
1164         This method is used in place of the real put() method when
1165         using local storage (see constructor's local_store argument).
1166
1167         copies and num_retries arguments are ignored: they are here
1168         only for the sake of offering the same call signature as
1169         put().
1170
1171         Data stored this way can be retrieved via local_store_get().
1172         """
1173         md5 = hashlib.md5(data).hexdigest()
1174         locator = '%s+%d' % (md5, len(data))
1175         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1176             f.write(data)
1177         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1178                   os.path.join(self.local_store, md5))
1179         return locator
1180
1181     def local_store_get(self, loc_s, num_retries=None):
1182         """Companion to local_store_put()."""
1183         try:
1184             locator = KeepLocator(loc_s)
1185         except ValueError:
1186             raise arvados.errors.NotFoundError(
1187                 "Invalid data locator: '%s'" % loc_s)
1188         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1189             return b''
1190         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1191             return f.read()
1192
1193     def is_cached(self, locator):
1194         return self.block_cache.reserve_cache(expect_hash)