1150bb6b4e52741e13fbfa3f1aa4f5935cb3468a
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import native_str
9 standard_library.install_aliases()
10 from builtins import next
11 from builtins import str
12 from builtins import range
13 from builtins import object
14 import collections
15 import datetime
16 import hashlib
17 import io
18 import logging
19 import math
20 import os
21 import pycurl
22 import queue
23 import re
24 import socket
25 import ssl
26 import sys
27 import threading
28 from . import timer
29 import urllib.parse
30
31 if sys.version_info >= (3, 0):
32     from io import BytesIO
33 else:
34     from cStringIO import StringIO as BytesIO
35
36 import arvados
37 import arvados.config as config
38 import arvados.errors
39 import arvados.retry as retry
40 import arvados.util
41
42 _logger = logging.getLogger('arvados.keep')
43 global_client_object = None
44
45
46 # Monkey patch TCP constants when not available (apple). Values sourced from:
47 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
48 if sys.platform == 'darwin':
49     if not hasattr(socket, 'TCP_KEEPALIVE'):
50         socket.TCP_KEEPALIVE = 0x010
51     if not hasattr(socket, 'TCP_KEEPINTVL'):
52         socket.TCP_KEEPINTVL = 0x101
53     if not hasattr(socket, 'TCP_KEEPCNT'):
54         socket.TCP_KEEPCNT = 0x102
55
56
57 class KeepLocator(object):
58     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
59     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
60
61     def __init__(self, locator_str):
62         self.hints = []
63         self._perm_sig = None
64         self._perm_expiry = None
65         pieces = iter(locator_str.split('+'))
66         self.md5sum = next(pieces)
67         try:
68             self.size = int(next(pieces))
69         except StopIteration:
70             self.size = None
71         for hint in pieces:
72             if self.HINT_RE.match(hint) is None:
73                 raise ValueError("invalid hint format: {}".format(hint))
74             elif hint.startswith('A'):
75                 self.parse_permission_hint(hint)
76             else:
77                 self.hints.append(hint)
78
79     def __str__(self):
80         return '+'.join(
81             native_str(s)
82             for s in [self.md5sum, self.size,
83                       self.permission_hint()] + self.hints
84             if s is not None)
85
86     def stripped(self):
87         if self.size is not None:
88             return "%s+%i" % (self.md5sum, self.size)
89         else:
90             return self.md5sum
91
92     def _make_hex_prop(name, length):
93         # Build and return a new property with the given name that
94         # must be a hex string of the given length.
95         data_name = '_{}'.format(name)
96         def getter(self):
97             return getattr(self, data_name)
98         def setter(self, hex_str):
99             if not arvados.util.is_hex(hex_str, length):
100                 raise ValueError("{} is not a {}-digit hex string: {!r}".
101                                  format(name, length, hex_str))
102             setattr(self, data_name, hex_str)
103         return property(getter, setter)
104
105     md5sum = _make_hex_prop('md5sum', 32)
106     perm_sig = _make_hex_prop('perm_sig', 40)
107
108     @property
109     def perm_expiry(self):
110         return self._perm_expiry
111
112     @perm_expiry.setter
113     def perm_expiry(self, value):
114         if not arvados.util.is_hex(value, 1, 8):
115             raise ValueError(
116                 "permission timestamp must be a hex Unix timestamp: {}".
117                 format(value))
118         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
119
120     def permission_hint(self):
121         data = [self.perm_sig, self.perm_expiry]
122         if None in data:
123             return None
124         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
125         return "A{}@{:08x}".format(*data)
126
127     def parse_permission_hint(self, s):
128         try:
129             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
130         except IndexError:
131             raise ValueError("bad permission hint {}".format(s))
132
133     def permission_expired(self, as_of_dt=None):
134         if self.perm_expiry is None:
135             return False
136         elif as_of_dt is None:
137             as_of_dt = datetime.datetime.now()
138         return self.perm_expiry <= as_of_dt
139
140
141 class Keep(object):
142     """Simple interface to a global KeepClient object.
143
144     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
145     own API client.  The global KeepClient will build an API client from the
146     current Arvados configuration, which may not match the one you built.
147     """
148     _last_key = None
149
150     @classmethod
151     def global_client_object(cls):
152         global global_client_object
153         # Previously, KeepClient would change its behavior at runtime based
154         # on these configuration settings.  We simulate that behavior here
155         # by checking the values and returning a new KeepClient if any of
156         # them have changed.
157         key = (config.get('ARVADOS_API_HOST'),
158                config.get('ARVADOS_API_TOKEN'),
159                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
160                config.get('ARVADOS_KEEP_PROXY'),
161                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
162                os.environ.get('KEEP_LOCAL_STORE'))
163         if (global_client_object is None) or (cls._last_key != key):
164             global_client_object = KeepClient()
165             cls._last_key = key
166         return global_client_object
167
168     @staticmethod
169     def get(locator, **kwargs):
170         return Keep.global_client_object().get(locator, **kwargs)
171
172     @staticmethod
173     def put(data, **kwargs):
174         return Keep.global_client_object().put(data, **kwargs)
175
176 class KeepBlockCache(object):
177     # Default RAM cache is 256MiB
178     def __init__(self, cache_max=(256 * 1024 * 1024)):
179         self.cache_max = cache_max
180         self._cache = []
181         self._cache_lock = threading.Lock()
182
183     class CacheSlot(object):
184         __slots__ = ("locator", "ready", "content")
185
186         def __init__(self, locator):
187             self.locator = locator
188             self.ready = threading.Event()
189             self.content = None
190
191         def get(self):
192             self.ready.wait()
193             return self.content
194
195         def set(self, value):
196             self.content = value
197             self.ready.set()
198
199         def size(self):
200             if self.content is None:
201                 return 0
202             else:
203                 return len(self.content)
204
205     def cap_cache(self):
206         '''Cap the cache size to self.cache_max'''
207         with self._cache_lock:
208             # Select all slots except those where ready.is_set() and content is
209             # None (that means there was an error reading the block).
210             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211             sm = sum([slot.size() for slot in self._cache])
212             while len(self._cache) > 0 and sm > self.cache_max:
213                 for i in range(len(self._cache)-1, -1, -1):
214                     if self._cache[i].ready.is_set():
215                         del self._cache[i]
216                         break
217                 sm = sum([slot.size() for slot in self._cache])
218
219     def _get(self, locator):
220         # Test if the locator is already in the cache
221         for i in range(0, len(self._cache)):
222             if self._cache[i].locator == locator:
223                 n = self._cache[i]
224                 if i != 0:
225                     # move it to the front
226                     del self._cache[i]
227                     self._cache.insert(0, n)
228                 return n
229         return None
230
231     def get(self, locator):
232         with self._cache_lock:
233             return self._get(locator)
234
235     def reserve_cache(self, locator):
236         '''Reserve a cache slot for the specified locator,
237         or return the existing slot.'''
238         with self._cache_lock:
239             n = self._get(locator)
240             if n:
241                 return n, False
242             else:
243                 # Add a new cache slot for the locator
244                 n = KeepBlockCache.CacheSlot(locator)
245                 self._cache.insert(0, n)
246                 return n, True
247
248 class Counter(object):
249     def __init__(self, v=0):
250         self._lk = threading.Lock()
251         self._val = v
252
253     def add(self, v):
254         with self._lk:
255             self._val += v
256
257     def get(self):
258         with self._lk:
259             return self._val
260
261
262 class KeepClient(object):
263
264     # Default Keep server connection timeout:  2 seconds
265     # Default Keep server read timeout:       256 seconds
266     # Default Keep server bandwidth minimum:  32768 bytes per second
267     # Default Keep proxy connection timeout:  20 seconds
268     # Default Keep proxy read timeout:        256 seconds
269     # Default Keep proxy bandwidth minimum:   32768 bytes per second
270     DEFAULT_TIMEOUT = (2, 256, 32768)
271     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
272
273
274     class KeepService(object):
275         """Make requests to a single Keep service, and track results.
276
277         A KeepService is intended to last long enough to perform one
278         transaction (GET or PUT) against one Keep service. This can
279         involve calling either get() or put() multiple times in order
280         to retry after transient failures. However, calling both get()
281         and put() on a single instance -- or using the same instance
282         to access two different Keep services -- will not produce
283         sensible behavior.
284         """
285
286         HTTP_ERRORS = (
287             socket.error,
288             ssl.SSLError,
289             arvados.errors.HttpError,
290         )
291
292         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
293                      upload_counter=None,
294                      download_counter=None,
295                      headers={}):
296             self.root = root
297             self._user_agent_pool = user_agent_pool
298             self._result = {'error': None}
299             self._usable = True
300             self._session = None
301             self._socket = None
302             self.get_headers = {'Accept': 'application/octet-stream'}
303             self.get_headers.update(headers)
304             self.put_headers = headers
305             self.upload_counter = upload_counter
306             self.download_counter = download_counter
307
308         def usable(self):
309             """Is it worth attempting a request?"""
310             return self._usable
311
312         def finished(self):
313             """Did the request succeed or encounter permanent failure?"""
314             return self._result['error'] == False or not self._usable
315
316         def last_result(self):
317             return self._result
318
319         def _get_user_agent(self):
320             try:
321                 return self._user_agent_pool.get(block=False)
322             except queue.Empty:
323                 return pycurl.Curl()
324
325         def _put_user_agent(self, ua):
326             try:
327                 ua.reset()
328                 self._user_agent_pool.put(ua, block=False)
329             except:
330                 ua.close()
331
332         def _socket_open(self, *args, **kwargs):
333             if len(args) + len(kwargs) == 2:
334                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
335             else:
336                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
337
338         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
339             return self._socket_open_pycurl_7_21_5(
340                 purpose=None,
341                 address=collections.namedtuple(
342                     'Address', ['family', 'socktype', 'protocol', 'addr'],
343                 )(family, socktype, protocol, address))
344
345         def _socket_open_pycurl_7_21_5(self, purpose, address):
346             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
347             s = socket.socket(address.family, address.socktype, address.protocol)
348             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
349             # Will throw invalid protocol error on mac. This test prevents that.
350             if hasattr(socket, 'TCP_KEEPIDLE'):
351                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
352             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
353             self._socket = s
354             return s
355
356         def get(self, locator, method="GET", timeout=None):
357             # locator is a KeepLocator object.
358             url = self.root + str(locator)
359             _logger.debug("Request: %s %s", method, url)
360             curl = self._get_user_agent()
361             ok = None
362             try:
363                 with timer.Timer() as t:
364                     self._headers = {}
365                     response_body = BytesIO()
366                     curl.setopt(pycurl.NOSIGNAL, 1)
367                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
368                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
369                     curl.setopt(pycurl.URL, url.encode('utf-8'))
370                     curl.setopt(pycurl.HTTPHEADER, [
371                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
372                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
373                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
374                     if self.insecure:
375                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
376                     if method == "HEAD":
377                         curl.setopt(pycurl.NOBODY, True)
378                     self._setcurltimeouts(curl, timeout)
379
380                     try:
381                         curl.perform()
382                     except Exception as e:
383                         raise arvados.errors.HttpError(0, str(e))
384                     finally:
385                         if self._socket:
386                             self._socket.close()
387                             self._socket = None
388                     self._result = {
389                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
390                         'body': response_body.getvalue(),
391                         'headers': self._headers,
392                         'error': False,
393                     }
394
395                 ok = retry.check_http_response_success(self._result['status_code'])
396                 if not ok:
397                     self._result['error'] = arvados.errors.HttpError(
398                         self._result['status_code'],
399                         self._headers.get('x-status-line', 'Error'))
400             except self.HTTP_ERRORS as e:
401                 self._result = {
402                     'error': e,
403                 }
404             self._usable = ok != False
405             if self._result.get('status_code', None):
406                 # The client worked well enough to get an HTTP status
407                 # code, so presumably any problems are just on the
408                 # server side and it's OK to reuse the client.
409                 self._put_user_agent(curl)
410             else:
411                 # Don't return this client to the pool, in case it's
412                 # broken.
413                 curl.close()
414             if not ok:
415                 _logger.debug("Request fail: GET %s => %s: %s",
416                               url, type(self._result['error']), str(self._result['error']))
417                 return None
418             if method == "HEAD":
419                 _logger.info("HEAD %s: %s bytes",
420                          self._result['status_code'],
421                          self._result.get('content-length'))
422                 return True
423
424             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
425                          self._result['status_code'],
426                          len(self._result['body']),
427                          t.msecs,
428                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
429
430             if self.download_counter:
431                 self.download_counter.add(len(self._result['body']))
432             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
433             if resp_md5 != locator.md5sum:
434                 _logger.warning("Checksum fail: md5(%s) = %s",
435                                 url, resp_md5)
436                 self._result['error'] = arvados.errors.HttpError(
437                     0, 'Checksum fail')
438                 return None
439             return self._result['body']
440
441         def put(self, hash_s, body, timeout=None):
442             url = self.root + hash_s
443             _logger.debug("Request: PUT %s", url)
444             curl = self._get_user_agent()
445             ok = None
446             try:
447                 with timer.Timer() as t:
448                     self._headers = {}
449                     body_reader = BytesIO(body)
450                     response_body = BytesIO()
451                     curl.setopt(pycurl.NOSIGNAL, 1)
452                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
453                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
454                     curl.setopt(pycurl.URL, url.encode('utf-8'))
455                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
456                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
457                     # response) instead of sending the request body immediately.
458                     # This allows the server to reject the request if the request
459                     # is invalid or the server is read-only, without waiting for
460                     # the client to send the entire block.
461                     curl.setopt(pycurl.UPLOAD, True)
462                     curl.setopt(pycurl.INFILESIZE, len(body))
463                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
464                     curl.setopt(pycurl.HTTPHEADER, [
465                         '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
466                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
467                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
468                     if self.insecure:
469                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
470                     self._setcurltimeouts(curl, timeout)
471                     try:
472                         curl.perform()
473                     except Exception as e:
474                         raise arvados.errors.HttpError(0, str(e))
475                     finally:
476                         if self._socket:
477                             self._socket.close()
478                             self._socket = None
479                     self._result = {
480                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
481                         'body': response_body.getvalue().decode('utf-8'),
482                         'headers': self._headers,
483                         'error': False,
484                     }
485                 ok = retry.check_http_response_success(self._result['status_code'])
486                 if not ok:
487                     self._result['error'] = arvados.errors.HttpError(
488                         self._result['status_code'],
489                         self._headers.get('x-status-line', 'Error'))
490             except self.HTTP_ERRORS as e:
491                 self._result = {
492                     'error': e,
493                 }
494             self._usable = ok != False # still usable if ok is True or None
495             if self._result.get('status_code', None):
496                 # Client is functional. See comment in get().
497                 self._put_user_agent(curl)
498             else:
499                 curl.close()
500             if not ok:
501                 _logger.debug("Request fail: PUT %s => %s: %s",
502                               url, type(self._result['error']), str(self._result['error']))
503                 return False
504             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
505                          self._result['status_code'],
506                          len(body),
507                          t.msecs,
508                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
509             if self.upload_counter:
510                 self.upload_counter.add(len(body))
511             return True
512
513         def _setcurltimeouts(self, curl, timeouts):
514             if not timeouts:
515                 return
516             elif isinstance(timeouts, tuple):
517                 if len(timeouts) == 2:
518                     conn_t, xfer_t = timeouts
519                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
520                 else:
521                     conn_t, xfer_t, bandwidth_bps = timeouts
522             else:
523                 conn_t, xfer_t = (timeouts, timeouts)
524                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
525             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
526             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
527             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
528
529         def _headerfunction(self, header_line):
530             if isinstance(header_line, bytes):
531                 header_line = header_line.decode('iso-8859-1')
532             if ':' in header_line:
533                 name, value = header_line.split(':', 1)
534                 name = name.strip().lower()
535                 value = value.strip()
536             elif self._headers:
537                 name = self._lastheadername
538                 value = self._headers[name] + ' ' + header_line.strip()
539             elif header_line.startswith('HTTP/'):
540                 name = 'x-status-line'
541                 value = header_line
542             else:
543                 _logger.error("Unexpected header line: %s", header_line)
544                 return
545             self._lastheadername = name
546             self._headers[name] = value
547             # Returning None implies all bytes were written
548
549
550     class KeepWriterQueue(queue.Queue):
551         def __init__(self, copies):
552             queue.Queue.__init__(self) # Old-style superclass
553             self.wanted_copies = copies
554             self.successful_copies = 0
555             self.response = None
556             self.successful_copies_lock = threading.Lock()
557             self.pending_tries = copies
558             self.pending_tries_notification = threading.Condition()
559
560         def write_success(self, response, replicas_nr):
561             with self.successful_copies_lock:
562                 self.successful_copies += replicas_nr
563                 self.response = response
564             with self.pending_tries_notification:
565                 self.pending_tries_notification.notify_all()
566
567         def write_fail(self, ks):
568             with self.pending_tries_notification:
569                 self.pending_tries += 1
570                 self.pending_tries_notification.notify()
571
572         def pending_copies(self):
573             with self.successful_copies_lock:
574                 return self.wanted_copies - self.successful_copies
575
576         def get_next_task(self):
577             with self.pending_tries_notification:
578                 while True:
579                     if self.pending_copies() < 1:
580                         # This notify_all() is unnecessary --
581                         # write_success() already called notify_all()
582                         # when pending<1 became true, so it's not
583                         # possible for any other thread to be in
584                         # wait() now -- but it's cheap insurance
585                         # against deadlock so we do it anyway:
586                         self.pending_tries_notification.notify_all()
587                         # Drain the queue and then raise Queue.Empty
588                         while True:
589                             self.get_nowait()
590                             self.task_done()
591                     elif self.pending_tries > 0:
592                         service, service_root = self.get_nowait()
593                         if service.finished():
594                             self.task_done()
595                             continue
596                         self.pending_tries -= 1
597                         return service, service_root
598                     elif self.empty():
599                         self.pending_tries_notification.notify_all()
600                         raise queue.Empty
601                     else:
602                         self.pending_tries_notification.wait()
603
604
605     class KeepWriterThreadPool(object):
606         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
607             self.total_task_nr = 0
608             self.wanted_copies = copies
609             if (not max_service_replicas) or (max_service_replicas >= copies):
610                 num_threads = 1
611             else:
612                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
613             _logger.debug("Pool max threads is %d", num_threads)
614             self.workers = []
615             self.queue = KeepClient.KeepWriterQueue(copies)
616             # Create workers
617             for _ in range(num_threads):
618                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
619                 self.workers.append(w)
620
621         def add_task(self, ks, service_root):
622             self.queue.put((ks, service_root))
623             self.total_task_nr += 1
624
625         def done(self):
626             return self.queue.successful_copies
627
628         def join(self):
629             # Start workers
630             for worker in self.workers:
631                 worker.start()
632             # Wait for finished work
633             self.queue.join()
634
635         def response(self):
636             return self.queue.response
637
638
639     class KeepWriterThread(threading.Thread):
640         TaskFailed = RuntimeError()
641
642         def __init__(self, queue, data, data_hash, timeout=None):
643             super(KeepClient.KeepWriterThread, self).__init__()
644             self.timeout = timeout
645             self.queue = queue
646             self.data = data
647             self.data_hash = data_hash
648             self.daemon = True
649
650         def run(self):
651             while True:
652                 try:
653                     service, service_root = self.queue.get_next_task()
654                 except queue.Empty:
655                     return
656                 try:
657                     locator, copies = self.do_task(service, service_root)
658                 except Exception as e:
659                     if e is not self.TaskFailed:
660                         _logger.exception("Exception in KeepWriterThread")
661                     self.queue.write_fail(service)
662                 else:
663                     self.queue.write_success(locator, copies)
664                 finally:
665                     self.queue.task_done()
666
667         def do_task(self, service, service_root):
668             success = bool(service.put(self.data_hash,
669                                         self.data,
670                                         timeout=self.timeout))
671             result = service.last_result()
672
673             if not success:
674                 if result.get('status_code', None):
675                     _logger.debug("Request fail: PUT %s => %s %s",
676                                   self.data_hash,
677                                   result['status_code'],
678                                   result['body'])
679                 raise self.TaskFailed
680
681             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
682                           str(threading.current_thread()),
683                           self.data_hash,
684                           len(self.data),
685                           service_root)
686             try:
687                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
688             except (KeyError, ValueError):
689                 replicas_stored = 1
690
691             return result['body'].strip(), replicas_stored
692
693
694     def __init__(self, api_client=None, proxy=None,
695                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
696                  api_token=None, local_store=None, block_cache=None,
697                  num_retries=0, session=None):
698         """Initialize a new KeepClient.
699
700         Arguments:
701         :api_client:
702           The API client to use to find Keep services.  If not
703           provided, KeepClient will build one from available Arvados
704           configuration.
705
706         :proxy:
707           If specified, this KeepClient will send requests to this Keep
708           proxy.  Otherwise, KeepClient will fall back to the setting of the
709           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
710           If you want to KeepClient does not use a proxy, pass in an empty
711           string.
712
713         :timeout:
714           The initial timeout (in seconds) for HTTP requests to Keep
715           non-proxy servers.  A tuple of three floats is interpreted as
716           (connection_timeout, read_timeout, minimum_bandwidth). A connection
717           will be aborted if the average traffic rate falls below
718           minimum_bandwidth bytes per second over an interval of read_timeout
719           seconds. Because timeouts are often a result of transient server
720           load, the actual connection timeout will be increased by a factor
721           of two on each retry.
722           Default: (2, 256, 32768).
723
724         :proxy_timeout:
725           The initial timeout (in seconds) for HTTP requests to
726           Keep proxies. A tuple of three floats is interpreted as
727           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
728           described above for adjusting connection timeouts on retry also
729           applies.
730           Default: (20, 256, 32768).
731
732         :api_token:
733           If you're not using an API client, but only talking
734           directly to a Keep proxy, this parameter specifies an API token
735           to authenticate Keep requests.  It is an error to specify both
736           api_client and api_token.  If you specify neither, KeepClient
737           will use one available from the Arvados configuration.
738
739         :local_store:
740           If specified, this KeepClient will bypass Keep
741           services, and save data to the named directory.  If unspecified,
742           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
743           environment variable.  If you want to ensure KeepClient does not
744           use local storage, pass in an empty string.  This is primarily
745           intended to mock a server for testing.
746
747         :num_retries:
748           The default number of times to retry failed requests.
749           This will be used as the default num_retries value when get() and
750           put() are called.  Default 0.
751         """
752         self.lock = threading.Lock()
753         if proxy is None:
754             if config.get('ARVADOS_KEEP_SERVICES'):
755                 proxy = config.get('ARVADOS_KEEP_SERVICES')
756             else:
757                 proxy = config.get('ARVADOS_KEEP_PROXY')
758         if api_token is None:
759             if api_client is None:
760                 api_token = config.get('ARVADOS_API_TOKEN')
761             else:
762                 api_token = api_client.api_token
763         elif api_client is not None:
764             raise ValueError(
765                 "can't build KeepClient with both API client and token")
766         if local_store is None:
767             local_store = os.environ.get('KEEP_LOCAL_STORE')
768
769         if config.flag_is_true('ARVADOS_API_HOST_INSECURE'):
770             self.insecure = True
771         else:
772             self.insecure = False
773
774         self.block_cache = block_cache if block_cache else KeepBlockCache()
775         self.timeout = timeout
776         self.proxy_timeout = proxy_timeout
777         self._user_agent_pool = queue.LifoQueue()
778         self.upload_counter = Counter()
779         self.download_counter = Counter()
780         self.put_counter = Counter()
781         self.get_counter = Counter()
782         self.hits_counter = Counter()
783         self.misses_counter = Counter()
784
785         if local_store:
786             self.local_store = local_store
787             self.get = self.local_store_get
788             self.put = self.local_store_put
789         else:
790             self.num_retries = num_retries
791             self.max_replicas_per_service = None
792             if proxy:
793                 proxy_uris = proxy.split()
794                 for i in range(len(proxy_uris)):
795                     if not proxy_uris[i].endswith('/'):
796                         proxy_uris[i] += '/'
797                     # URL validation
798                     url = urllib.parse.urlparse(proxy_uris[i])
799                     if not (url.scheme and url.netloc):
800                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
801                 self.api_token = api_token
802                 self._gateway_services = {}
803                 self._keep_services = [{
804                     'uuid': "00000-bi6l4-%015d" % idx,
805                     'service_type': 'proxy',
806                     '_service_root': uri,
807                     } for idx, uri in enumerate(proxy_uris)]
808                 self._writable_services = self._keep_services
809                 self.using_proxy = True
810                 self._static_services_list = True
811             else:
812                 # It's important to avoid instantiating an API client
813                 # unless we actually need one, for testing's sake.
814                 if api_client is None:
815                     api_client = arvados.api('v1')
816                 self.api_client = api_client
817                 self.api_token = api_client.api_token
818                 self._gateway_services = {}
819                 self._keep_services = None
820                 self._writable_services = None
821                 self.using_proxy = None
822                 self._static_services_list = False
823
824     def current_timeout(self, attempt_number):
825         """Return the appropriate timeout to use for this client.
826
827         The proxy timeout setting if the backend service is currently a proxy,
828         the regular timeout setting otherwise.  The `attempt_number` indicates
829         how many times the operation has been tried already (starting from 0
830         for the first try), and scales the connection timeout portion of the
831         return value accordingly.
832
833         """
834         # TODO(twp): the timeout should be a property of a
835         # KeepService, not a KeepClient. See #4488.
836         t = self.proxy_timeout if self.using_proxy else self.timeout
837         if len(t) == 2:
838             return (t[0] * (1 << attempt_number), t[1])
839         else:
840             return (t[0] * (1 << attempt_number), t[1], t[2])
841     def _any_nondisk_services(self, service_list):
842         return any(ks.get('service_type', 'disk') != 'disk'
843                    for ks in service_list)
844
845     def build_services_list(self, force_rebuild=False):
846         if (self._static_services_list or
847               (self._keep_services and not force_rebuild)):
848             return
849         with self.lock:
850             try:
851                 keep_services = self.api_client.keep_services().accessible()
852             except Exception:  # API server predates Keep services.
853                 keep_services = self.api_client.keep_disks().list()
854
855             # Gateway services are only used when specified by UUID,
856             # so there's nothing to gain by filtering them by
857             # service_type.
858             self._gateway_services = {ks['uuid']: ks for ks in
859                                       keep_services.execute()['items']}
860             if not self._gateway_services:
861                 raise arvados.errors.NoKeepServersError()
862
863             # Precompute the base URI for each service.
864             for r in self._gateway_services.values():
865                 host = r['service_host']
866                 if not host.startswith('[') and host.find(':') >= 0:
867                     # IPv6 URIs must be formatted like http://[::1]:80/...
868                     host = '[' + host + ']'
869                 r['_service_root'] = "{}://{}:{:d}/".format(
870                     'https' if r['service_ssl_flag'] else 'http',
871                     host,
872                     r['service_port'])
873
874             _logger.debug(str(self._gateway_services))
875             self._keep_services = [
876                 ks for ks in self._gateway_services.values()
877                 if not ks.get('service_type', '').startswith('gateway:')]
878             self._writable_services = [ks for ks in self._keep_services
879                                        if not ks.get('read_only')]
880
881             # For disk type services, max_replicas_per_service is 1
882             # It is unknown (unlimited) for other service types.
883             if self._any_nondisk_services(self._writable_services):
884                 self.max_replicas_per_service = None
885             else:
886                 self.max_replicas_per_service = 1
887
888     def _service_weight(self, data_hash, service_uuid):
889         """Compute the weight of a Keep service endpoint for a data
890         block with a known hash.
891
892         The weight is md5(h + u) where u is the last 15 characters of
893         the service endpoint's UUID.
894         """
895         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
896
897     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
898         """Return an array of Keep service endpoints, in the order in
899         which they should be probed when reading or writing data with
900         the given hash+hints.
901         """
902         self.build_services_list(force_rebuild)
903
904         sorted_roots = []
905         # Use the services indicated by the given +K@... remote
906         # service hints, if any are present and can be resolved to a
907         # URI.
908         for hint in locator.hints:
909             if hint.startswith('K@'):
910                 if len(hint) == 7:
911                     sorted_roots.append(
912                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
913                 elif len(hint) == 29:
914                     svc = self._gateway_services.get(hint[2:])
915                     if svc:
916                         sorted_roots.append(svc['_service_root'])
917
918         # Sort the available local services by weight (heaviest first)
919         # for this locator, and return their service_roots (base URIs)
920         # in that order.
921         use_services = self._keep_services
922         if need_writable:
923             use_services = self._writable_services
924         self.using_proxy = self._any_nondisk_services(use_services)
925         sorted_roots.extend([
926             svc['_service_root'] for svc in sorted(
927                 use_services,
928                 reverse=True,
929                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
930         _logger.debug("{}: {}".format(locator, sorted_roots))
931         return sorted_roots
932
933     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
934         # roots_map is a dictionary, mapping Keep service root strings
935         # to KeepService objects.  Poll for Keep services, and add any
936         # new ones to roots_map.  Return the current list of local
937         # root strings.
938         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
939         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
940         for root in local_roots:
941             if root not in roots_map:
942                 roots_map[root] = self.KeepService(
943                     root, self._user_agent_pool,
944                     upload_counter=self.upload_counter,
945                     download_counter=self.download_counter,
946                     headers=headers)
947         return local_roots
948
949     @staticmethod
950     def _check_loop_result(result):
951         # KeepClient RetryLoops should save results as a 2-tuple: the
952         # actual result of the request, and the number of servers available
953         # to receive the request this round.
954         # This method returns True if there's a real result, False if
955         # there are no more servers available, otherwise None.
956         if isinstance(result, Exception):
957             return None
958         result, tried_server_count = result
959         if (result is not None) and (result is not False):
960             return True
961         elif tried_server_count < 1:
962             _logger.info("No more Keep services to try; giving up")
963             return False
964         else:
965             return None
966
967     def get_from_cache(self, loc):
968         """Fetch a block only if is in the cache, otherwise return None."""
969         slot = self.block_cache.get(loc)
970         if slot is not None and slot.ready.is_set():
971             return slot.get()
972         else:
973             return None
974
975     @retry.retry_method
976     def head(self, loc_s, **kwargs):
977         return self._get_or_head(loc_s, method="HEAD", **kwargs)
978
979     @retry.retry_method
980     def get(self, loc_s, **kwargs):
981         return self._get_or_head(loc_s, method="GET", **kwargs)
982
983     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None):
984         """Get data from Keep.
985
986         This method fetches one or more blocks of data from Keep.  It
987         sends a request each Keep service registered with the API
988         server (or the proxy provided when this client was
989         instantiated), then each service named in location hints, in
990         sequence.  As soon as one service provides the data, it's
991         returned.
992
993         Arguments:
994         * loc_s: A string of one or more comma-separated locators to fetch.
995           This method returns the concatenation of these blocks.
996         * num_retries: The number of times to retry GET requests to
997           *each* Keep server if it returns temporary failures, with
998           exponential backoff.  Note that, in each loop, the method may try
999           to fetch data from every available Keep service, along with any
1000           that are named in location hints in the locator.  The default value
1001           is set when the KeepClient is initialized.
1002         """
1003         if ',' in loc_s:
1004             return ''.join(self.get(x) for x in loc_s.split(','))
1005
1006         self.get_counter.add(1)
1007
1008         slot = None
1009         blob = None
1010         try:
1011             locator = KeepLocator(loc_s)
1012             if method == "GET":
1013                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1014                 if not first:
1015                     self.hits_counter.add(1)
1016                     blob = slot.get()
1017                     if blob is None:
1018                         raise arvados.errors.KeepReadError(
1019                             "failed to read {}".format(loc_s))
1020                     return blob
1021
1022             self.misses_counter.add(1)
1023
1024             headers = {
1025                 'X-Request-Id': (request_id or
1026                                  (hasattr(self, 'api_client') and self.api_client.request_id) or
1027                                  arvados.util.new_request_id()),
1028             }
1029
1030             # If the locator has hints specifying a prefix (indicating a
1031             # remote keepproxy) or the UUID of a local gateway service,
1032             # read data from the indicated service(s) instead of the usual
1033             # list of local disk services.
1034             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1035                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1036             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1037                                for hint in locator.hints if (
1038                                        hint.startswith('K@') and
1039                                        len(hint) == 29 and
1040                                        self._gateway_services.get(hint[2:])
1041                                        )])
1042             # Map root URLs to their KeepService objects.
1043             roots_map = {
1044                 root: self.KeepService(root, self._user_agent_pool,
1045                                        upload_counter=self.upload_counter,
1046                                        download_counter=self.download_counter,
1047                                        headers=headers)
1048                 for root in hint_roots
1049             }
1050
1051             # See #3147 for a discussion of the loop implementation.  Highlights:
1052             # * Refresh the list of Keep services after each failure, in case
1053             #   it's being updated.
1054             # * Retry until we succeed, we're out of retries, or every available
1055             #   service has returned permanent failure.
1056             sorted_roots = []
1057             roots_map = {}
1058             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1059                                    backoff_start=2)
1060             for tries_left in loop:
1061                 try:
1062                     sorted_roots = self.map_new_services(
1063                         roots_map, locator,
1064                         force_rebuild=(tries_left < num_retries),
1065                         need_writable=False,
1066                         headers=headers)
1067                 except Exception as error:
1068                     loop.save_result(error)
1069                     continue
1070
1071                 # Query KeepService objects that haven't returned
1072                 # permanent failure, in our specified shuffle order.
1073                 services_to_try = [roots_map[root]
1074                                    for root in sorted_roots
1075                                    if roots_map[root].usable()]
1076                 for keep_service in services_to_try:
1077                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1078                     if blob is not None:
1079                         break
1080                 loop.save_result((blob, len(services_to_try)))
1081
1082             # Always cache the result, then return it if we succeeded.
1083             if loop.success():
1084                 if method == "HEAD":
1085                     return True
1086                 else:
1087                     return blob
1088         finally:
1089             if slot is not None:
1090                 slot.set(blob)
1091                 self.block_cache.cap_cache()
1092
1093         # Q: Including 403 is necessary for the Keep tests to continue
1094         # passing, but maybe they should expect KeepReadError instead?
1095         not_founds = sum(1 for key in sorted_roots
1096                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1097         service_errors = ((key, roots_map[key].last_result()['error'])
1098                           for key in sorted_roots)
1099         if not roots_map:
1100             raise arvados.errors.KeepReadError(
1101                 "failed to read {}: no Keep services available ({})".format(
1102                     loc_s, loop.last_result()))
1103         elif not_founds == len(sorted_roots):
1104             raise arvados.errors.NotFoundError(
1105                 "{} not found".format(loc_s), service_errors)
1106         else:
1107             raise arvados.errors.KeepReadError(
1108                 "failed to read {}".format(loc_s), service_errors, label="service")
1109
1110     @retry.retry_method
1111     def put(self, data, copies=2, num_retries=None, request_id=None):
1112         """Save data in Keep.
1113
1114         This method will get a list of Keep services from the API server, and
1115         send the data to each one simultaneously in a new thread.  Once the
1116         uploads are finished, if enough copies are saved, this method returns
1117         the most recent HTTP response body.  If requests fail to upload
1118         enough copies, this method raises KeepWriteError.
1119
1120         Arguments:
1121         * data: The string of data to upload.
1122         * copies: The number of copies that the user requires be saved.
1123           Default 2.
1124         * num_retries: The number of times to retry PUT requests to
1125           *each* Keep server if it returns temporary failures, with
1126           exponential backoff.  The default value is set when the
1127           KeepClient is initialized.
1128         """
1129
1130         if not isinstance(data, bytes):
1131             data = data.encode()
1132
1133         self.put_counter.add(1)
1134
1135         data_hash = hashlib.md5(data).hexdigest()
1136         loc_s = data_hash + '+' + str(len(data))
1137         if copies < 1:
1138             return loc_s
1139         locator = KeepLocator(loc_s)
1140
1141         headers = {
1142             'X-Request-Id': (request_id or
1143                              (hasattr(self, 'api_client') and self.api_client.request_id) or
1144                              arvados.util.new_request_id()),
1145             'X-Keep-Desired-Replicas': str(copies),
1146         }
1147         roots_map = {}
1148         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1149                                backoff_start=2)
1150         done = 0
1151         for tries_left in loop:
1152             try:
1153                 sorted_roots = self.map_new_services(
1154                     roots_map, locator,
1155                     force_rebuild=(tries_left < num_retries),
1156                     need_writable=True,
1157                     headers=headers)
1158             except Exception as error:
1159                 loop.save_result(error)
1160                 continue
1161
1162             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1163                                                         data_hash=data_hash,
1164                                                         copies=copies - done,
1165                                                         max_service_replicas=self.max_replicas_per_service,
1166                                                         timeout=self.current_timeout(num_retries - tries_left))
1167             for service_root, ks in [(root, roots_map[root])
1168                                      for root in sorted_roots]:
1169                 if ks.finished():
1170                     continue
1171                 writer_pool.add_task(ks, service_root)
1172             writer_pool.join()
1173             done += writer_pool.done()
1174             loop.save_result((done >= copies, writer_pool.total_task_nr))
1175
1176         if loop.success():
1177             return writer_pool.response()
1178         if not roots_map:
1179             raise arvados.errors.KeepWriteError(
1180                 "failed to write {}: no Keep services available ({})".format(
1181                     data_hash, loop.last_result()))
1182         else:
1183             service_errors = ((key, roots_map[key].last_result()['error'])
1184                               for key in sorted_roots
1185                               if roots_map[key].last_result()['error'])
1186             raise arvados.errors.KeepWriteError(
1187                 "failed to write {} (wanted {} copies but wrote {})".format(
1188                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1189
1190     def local_store_put(self, data, copies=1, num_retries=None):
1191         """A stub for put().
1192
1193         This method is used in place of the real put() method when
1194         using local storage (see constructor's local_store argument).
1195
1196         copies and num_retries arguments are ignored: they are here
1197         only for the sake of offering the same call signature as
1198         put().
1199
1200         Data stored this way can be retrieved via local_store_get().
1201         """
1202         md5 = hashlib.md5(data).hexdigest()
1203         locator = '%s+%d' % (md5, len(data))
1204         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1205             f.write(data)
1206         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1207                   os.path.join(self.local_store, md5))
1208         return locator
1209
1210     def local_store_get(self, loc_s, num_retries=None):
1211         """Companion to local_store_put()."""
1212         try:
1213             locator = KeepLocator(loc_s)
1214         except ValueError:
1215             raise arvados.errors.NotFoundError(
1216                 "Invalid data locator: '%s'" % loc_s)
1217         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1218             return b''
1219         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1220             return f.read()
1221
1222     def is_cached(self, locator):
1223         return self.block_cache.reserve_cache(expect_hash)