Merge branch '14595-leave-modified-at' refs #14595
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 from future import standard_library
8 from future.utils import native_str
9 standard_library.install_aliases()
10 from builtins import next
11 from builtins import str
12 from builtins import range
13 from builtins import object
14 import collections
15 import datetime
16 import hashlib
17 import io
18 import logging
19 import math
20 import os
21 import pycurl
22 import queue
23 import re
24 import socket
25 import ssl
26 import sys
27 import threading
28 from . import timer
29 import urllib.parse
30
31 if sys.version_info >= (3, 0):
32     from io import BytesIO
33 else:
34     from cStringIO import StringIO as BytesIO
35
36 import arvados
37 import arvados.config as config
38 import arvados.errors
39 import arvados.retry as retry
40 import arvados.util
41
42 _logger = logging.getLogger('arvados.keep')
43 global_client_object = None
44
45
46 # Monkey patch TCP constants when not available (apple). Values sourced from:
47 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
48 if sys.platform == 'darwin':
49     if not hasattr(socket, 'TCP_KEEPALIVE'):
50         socket.TCP_KEEPALIVE = 0x010
51     if not hasattr(socket, 'TCP_KEEPINTVL'):
52         socket.TCP_KEEPINTVL = 0x101
53     if not hasattr(socket, 'TCP_KEEPCNT'):
54         socket.TCP_KEEPCNT = 0x102
55
56
57 class KeepLocator(object):
58     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
59     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
60
61     def __init__(self, locator_str):
62         self.hints = []
63         self._perm_sig = None
64         self._perm_expiry = None
65         pieces = iter(locator_str.split('+'))
66         self.md5sum = next(pieces)
67         try:
68             self.size = int(next(pieces))
69         except StopIteration:
70             self.size = None
71         for hint in pieces:
72             if self.HINT_RE.match(hint) is None:
73                 raise ValueError("invalid hint format: {}".format(hint))
74             elif hint.startswith('A'):
75                 self.parse_permission_hint(hint)
76             else:
77                 self.hints.append(hint)
78
79     def __str__(self):
80         return '+'.join(
81             native_str(s)
82             for s in [self.md5sum, self.size,
83                       self.permission_hint()] + self.hints
84             if s is not None)
85
86     def stripped(self):
87         if self.size is not None:
88             return "%s+%i" % (self.md5sum, self.size)
89         else:
90             return self.md5sum
91
92     def _make_hex_prop(name, length):
93         # Build and return a new property with the given name that
94         # must be a hex string of the given length.
95         data_name = '_{}'.format(name)
96         def getter(self):
97             return getattr(self, data_name)
98         def setter(self, hex_str):
99             if not arvados.util.is_hex(hex_str, length):
100                 raise ValueError("{} is not a {}-digit hex string: {!r}".
101                                  format(name, length, hex_str))
102             setattr(self, data_name, hex_str)
103         return property(getter, setter)
104
105     md5sum = _make_hex_prop('md5sum', 32)
106     perm_sig = _make_hex_prop('perm_sig', 40)
107
108     @property
109     def perm_expiry(self):
110         return self._perm_expiry
111
112     @perm_expiry.setter
113     def perm_expiry(self, value):
114         if not arvados.util.is_hex(value, 1, 8):
115             raise ValueError(
116                 "permission timestamp must be a hex Unix timestamp: {}".
117                 format(value))
118         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
119
120     def permission_hint(self):
121         data = [self.perm_sig, self.perm_expiry]
122         if None in data:
123             return None
124         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
125         return "A{}@{:08x}".format(*data)
126
127     def parse_permission_hint(self, s):
128         try:
129             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
130         except IndexError:
131             raise ValueError("bad permission hint {}".format(s))
132
133     def permission_expired(self, as_of_dt=None):
134         if self.perm_expiry is None:
135             return False
136         elif as_of_dt is None:
137             as_of_dt = datetime.datetime.now()
138         return self.perm_expiry <= as_of_dt
139
140
141 class Keep(object):
142     """Simple interface to a global KeepClient object.
143
144     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
145     own API client.  The global KeepClient will build an API client from the
146     current Arvados configuration, which may not match the one you built.
147     """
148     _last_key = None
149
150     @classmethod
151     def global_client_object(cls):
152         global global_client_object
153         # Previously, KeepClient would change its behavior at runtime based
154         # on these configuration settings.  We simulate that behavior here
155         # by checking the values and returning a new KeepClient if any of
156         # them have changed.
157         key = (config.get('ARVADOS_API_HOST'),
158                config.get('ARVADOS_API_TOKEN'),
159                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
160                config.get('ARVADOS_KEEP_PROXY'),
161                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
162                os.environ.get('KEEP_LOCAL_STORE'))
163         if (global_client_object is None) or (cls._last_key != key):
164             global_client_object = KeepClient()
165             cls._last_key = key
166         return global_client_object
167
168     @staticmethod
169     def get(locator, **kwargs):
170         return Keep.global_client_object().get(locator, **kwargs)
171
172     @staticmethod
173     def put(data, **kwargs):
174         return Keep.global_client_object().put(data, **kwargs)
175
176 class KeepBlockCache(object):
177     # Default RAM cache is 256MiB
178     def __init__(self, cache_max=(256 * 1024 * 1024)):
179         self.cache_max = cache_max
180         self._cache = []
181         self._cache_lock = threading.Lock()
182
183     class CacheSlot(object):
184         __slots__ = ("locator", "ready", "content")
185
186         def __init__(self, locator):
187             self.locator = locator
188             self.ready = threading.Event()
189             self.content = None
190
191         def get(self):
192             self.ready.wait()
193             return self.content
194
195         def set(self, value):
196             self.content = value
197             self.ready.set()
198
199         def size(self):
200             if self.content is None:
201                 return 0
202             else:
203                 return len(self.content)
204
205     def cap_cache(self):
206         '''Cap the cache size to self.cache_max'''
207         with self._cache_lock:
208             # Select all slots except those where ready.is_set() and content is
209             # None (that means there was an error reading the block).
210             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211             sm = sum([slot.size() for slot in self._cache])
212             while len(self._cache) > 0 and sm > self.cache_max:
213                 for i in range(len(self._cache)-1, -1, -1):
214                     if self._cache[i].ready.is_set():
215                         del self._cache[i]
216                         break
217                 sm = sum([slot.size() for slot in self._cache])
218
219     def _get(self, locator):
220         # Test if the locator is already in the cache
221         for i in range(0, len(self._cache)):
222             if self._cache[i].locator == locator:
223                 n = self._cache[i]
224                 if i != 0:
225                     # move it to the front
226                     del self._cache[i]
227                     self._cache.insert(0, n)
228                 return n
229         return None
230
231     def get(self, locator):
232         with self._cache_lock:
233             return self._get(locator)
234
235     def reserve_cache(self, locator):
236         '''Reserve a cache slot for the specified locator,
237         or return the existing slot.'''
238         with self._cache_lock:
239             n = self._get(locator)
240             if n:
241                 return n, False
242             else:
243                 # Add a new cache slot for the locator
244                 n = KeepBlockCache.CacheSlot(locator)
245                 self._cache.insert(0, n)
246                 return n, True
247
248 class Counter(object):
249     def __init__(self, v=0):
250         self._lk = threading.Lock()
251         self._val = v
252
253     def add(self, v):
254         with self._lk:
255             self._val += v
256
257     def get(self):
258         with self._lk:
259             return self._val
260
261
262 class KeepClient(object):
263
264     # Default Keep server connection timeout:  2 seconds
265     # Default Keep server read timeout:       256 seconds
266     # Default Keep server bandwidth minimum:  32768 bytes per second
267     # Default Keep proxy connection timeout:  20 seconds
268     # Default Keep proxy read timeout:        256 seconds
269     # Default Keep proxy bandwidth minimum:   32768 bytes per second
270     DEFAULT_TIMEOUT = (2, 256, 32768)
271     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
272
273
274     class KeepService(object):
275         """Make requests to a single Keep service, and track results.
276
277         A KeepService is intended to last long enough to perform one
278         transaction (GET or PUT) against one Keep service. This can
279         involve calling either get() or put() multiple times in order
280         to retry after transient failures. However, calling both get()
281         and put() on a single instance -- or using the same instance
282         to access two different Keep services -- will not produce
283         sensible behavior.
284         """
285
286         HTTP_ERRORS = (
287             socket.error,
288             ssl.SSLError,
289             arvados.errors.HttpError,
290         )
291
292         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
293                      upload_counter=None,
294                      download_counter=None,
295                      headers={},
296                      insecure=False):
297             self.root = root
298             self._user_agent_pool = user_agent_pool
299             self._result = {'error': None}
300             self._usable = True
301             self._session = None
302             self._socket = None
303             self.get_headers = {'Accept': 'application/octet-stream'}
304             self.get_headers.update(headers)
305             self.put_headers = headers
306             self.upload_counter = upload_counter
307             self.download_counter = download_counter
308             self.insecure = insecure
309
310         def usable(self):
311             """Is it worth attempting a request?"""
312             return self._usable
313
314         def finished(self):
315             """Did the request succeed or encounter permanent failure?"""
316             return self._result['error'] == False or not self._usable
317
318         def last_result(self):
319             return self._result
320
321         def _get_user_agent(self):
322             try:
323                 return self._user_agent_pool.get(block=False)
324             except queue.Empty:
325                 return pycurl.Curl()
326
327         def _put_user_agent(self, ua):
328             try:
329                 ua.reset()
330                 self._user_agent_pool.put(ua, block=False)
331             except:
332                 ua.close()
333
334         def _socket_open(self, *args, **kwargs):
335             if len(args) + len(kwargs) == 2:
336                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
337             else:
338                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
339
340         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
341             return self._socket_open_pycurl_7_21_5(
342                 purpose=None,
343                 address=collections.namedtuple(
344                     'Address', ['family', 'socktype', 'protocol', 'addr'],
345                 )(family, socktype, protocol, address))
346
347         def _socket_open_pycurl_7_21_5(self, purpose, address):
348             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
349             s = socket.socket(address.family, address.socktype, address.protocol)
350             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
351             # Will throw invalid protocol error on mac. This test prevents that.
352             if hasattr(socket, 'TCP_KEEPIDLE'):
353                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
354             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
355             self._socket = s
356             return s
357
358         def get(self, locator, method="GET", timeout=None):
359             # locator is a KeepLocator object.
360             url = self.root + str(locator)
361             _logger.debug("Request: %s %s", method, url)
362             curl = self._get_user_agent()
363             ok = None
364             try:
365                 with timer.Timer() as t:
366                     self._headers = {}
367                     response_body = BytesIO()
368                     curl.setopt(pycurl.NOSIGNAL, 1)
369                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
370                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
371                     curl.setopt(pycurl.URL, url.encode('utf-8'))
372                     curl.setopt(pycurl.HTTPHEADER, [
373                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
374                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
375                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
376                     if self.insecure:
377                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
378                     if method == "HEAD":
379                         curl.setopt(pycurl.NOBODY, True)
380                     self._setcurltimeouts(curl, timeout, method=="HEAD")
381
382                     try:
383                         curl.perform()
384                     except Exception as e:
385                         raise arvados.errors.HttpError(0, str(e))
386                     finally:
387                         if self._socket:
388                             self._socket.close()
389                             self._socket = None
390                     self._result = {
391                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
392                         'body': response_body.getvalue(),
393                         'headers': self._headers,
394                         'error': False,
395                     }
396
397                 ok = retry.check_http_response_success(self._result['status_code'])
398                 if not ok:
399                     self._result['error'] = arvados.errors.HttpError(
400                         self._result['status_code'],
401                         self._headers.get('x-status-line', 'Error'))
402             except self.HTTP_ERRORS as e:
403                 self._result = {
404                     'error': e,
405                 }
406             self._usable = ok != False
407             if self._result.get('status_code', None):
408                 # The client worked well enough to get an HTTP status
409                 # code, so presumably any problems are just on the
410                 # server side and it's OK to reuse the client.
411                 self._put_user_agent(curl)
412             else:
413                 # Don't return this client to the pool, in case it's
414                 # broken.
415                 curl.close()
416             if not ok:
417                 _logger.debug("Request fail: GET %s => %s: %s",
418                               url, type(self._result['error']), str(self._result['error']))
419                 return None
420             if method == "HEAD":
421                 _logger.info("HEAD %s: %s bytes",
422                          self._result['status_code'],
423                          self._result.get('content-length'))
424                 if self._result['headers'].get('x-keep-locator'):
425                     # This is a response to a remote block copy request, return
426                     # the local copy block locator.
427                     return self._result['headers'].get('x-keep-locator')
428                 return True
429
430             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
431                          self._result['status_code'],
432                          len(self._result['body']),
433                          t.msecs,
434                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
435
436             if self.download_counter:
437                 self.download_counter.add(len(self._result['body']))
438             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
439             if resp_md5 != locator.md5sum:
440                 _logger.warning("Checksum fail: md5(%s) = %s",
441                                 url, resp_md5)
442                 self._result['error'] = arvados.errors.HttpError(
443                     0, 'Checksum fail')
444                 return None
445             return self._result['body']
446
447         def put(self, hash_s, body, timeout=None):
448             url = self.root + hash_s
449             _logger.debug("Request: PUT %s", url)
450             curl = self._get_user_agent()
451             ok = None
452             try:
453                 with timer.Timer() as t:
454                     self._headers = {}
455                     body_reader = BytesIO(body)
456                     response_body = BytesIO()
457                     curl.setopt(pycurl.NOSIGNAL, 1)
458                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
459                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
460                     curl.setopt(pycurl.URL, url.encode('utf-8'))
461                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
462                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
463                     # response) instead of sending the request body immediately.
464                     # This allows the server to reject the request if the request
465                     # is invalid or the server is read-only, without waiting for
466                     # the client to send the entire block.
467                     curl.setopt(pycurl.UPLOAD, True)
468                     curl.setopt(pycurl.INFILESIZE, len(body))
469                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
470                     curl.setopt(pycurl.HTTPHEADER, [
471                         '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
472                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
473                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
474                     if self.insecure:
475                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
476                     self._setcurltimeouts(curl, timeout)
477                     try:
478                         curl.perform()
479                     except Exception as e:
480                         raise arvados.errors.HttpError(0, str(e))
481                     finally:
482                         if self._socket:
483                             self._socket.close()
484                             self._socket = None
485                     self._result = {
486                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
487                         'body': response_body.getvalue().decode('utf-8'),
488                         'headers': self._headers,
489                         'error': False,
490                     }
491                 ok = retry.check_http_response_success(self._result['status_code'])
492                 if not ok:
493                     self._result['error'] = arvados.errors.HttpError(
494                         self._result['status_code'],
495                         self._headers.get('x-status-line', 'Error'))
496             except self.HTTP_ERRORS as e:
497                 self._result = {
498                     'error': e,
499                 }
500             self._usable = ok != False # still usable if ok is True or None
501             if self._result.get('status_code', None):
502                 # Client is functional. See comment in get().
503                 self._put_user_agent(curl)
504             else:
505                 curl.close()
506             if not ok:
507                 _logger.debug("Request fail: PUT %s => %s: %s",
508                               url, type(self._result['error']), str(self._result['error']))
509                 return False
510             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
511                          self._result['status_code'],
512                          len(body),
513                          t.msecs,
514                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
515             if self.upload_counter:
516                 self.upload_counter.add(len(body))
517             return True
518
519         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
520             if not timeouts:
521                 return
522             elif isinstance(timeouts, tuple):
523                 if len(timeouts) == 2:
524                     conn_t, xfer_t = timeouts
525                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
526                 else:
527                     conn_t, xfer_t, bandwidth_bps = timeouts
528             else:
529                 conn_t, xfer_t = (timeouts, timeouts)
530                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
531             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
532             if not ignore_bandwidth:
533                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
534                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
535
536         def _headerfunction(self, header_line):
537             if isinstance(header_line, bytes):
538                 header_line = header_line.decode('iso-8859-1')
539             if ':' in header_line:
540                 name, value = header_line.split(':', 1)
541                 name = name.strip().lower()
542                 value = value.strip()
543             elif self._headers:
544                 name = self._lastheadername
545                 value = self._headers[name] + ' ' + header_line.strip()
546             elif header_line.startswith('HTTP/'):
547                 name = 'x-status-line'
548                 value = header_line
549             else:
550                 _logger.error("Unexpected header line: %s", header_line)
551                 return
552             self._lastheadername = name
553             self._headers[name] = value
554             # Returning None implies all bytes were written
555
556
557     class KeepWriterQueue(queue.Queue):
558         def __init__(self, copies):
559             queue.Queue.__init__(self) # Old-style superclass
560             self.wanted_copies = copies
561             self.successful_copies = 0
562             self.response = None
563             self.successful_copies_lock = threading.Lock()
564             self.pending_tries = copies
565             self.pending_tries_notification = threading.Condition()
566
567         def write_success(self, response, replicas_nr):
568             with self.successful_copies_lock:
569                 self.successful_copies += replicas_nr
570                 self.response = response
571             with self.pending_tries_notification:
572                 self.pending_tries_notification.notify_all()
573
574         def write_fail(self, ks):
575             with self.pending_tries_notification:
576                 self.pending_tries += 1
577                 self.pending_tries_notification.notify()
578
579         def pending_copies(self):
580             with self.successful_copies_lock:
581                 return self.wanted_copies - self.successful_copies
582
583         def get_next_task(self):
584             with self.pending_tries_notification:
585                 while True:
586                     if self.pending_copies() < 1:
587                         # This notify_all() is unnecessary --
588                         # write_success() already called notify_all()
589                         # when pending<1 became true, so it's not
590                         # possible for any other thread to be in
591                         # wait() now -- but it's cheap insurance
592                         # against deadlock so we do it anyway:
593                         self.pending_tries_notification.notify_all()
594                         # Drain the queue and then raise Queue.Empty
595                         while True:
596                             self.get_nowait()
597                             self.task_done()
598                     elif self.pending_tries > 0:
599                         service, service_root = self.get_nowait()
600                         if service.finished():
601                             self.task_done()
602                             continue
603                         self.pending_tries -= 1
604                         return service, service_root
605                     elif self.empty():
606                         self.pending_tries_notification.notify_all()
607                         raise queue.Empty
608                     else:
609                         self.pending_tries_notification.wait()
610
611
612     class KeepWriterThreadPool(object):
613         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
614             self.total_task_nr = 0
615             self.wanted_copies = copies
616             if (not max_service_replicas) or (max_service_replicas >= copies):
617                 num_threads = 1
618             else:
619                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
620             _logger.debug("Pool max threads is %d", num_threads)
621             self.workers = []
622             self.queue = KeepClient.KeepWriterQueue(copies)
623             # Create workers
624             for _ in range(num_threads):
625                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
626                 self.workers.append(w)
627
628         def add_task(self, ks, service_root):
629             self.queue.put((ks, service_root))
630             self.total_task_nr += 1
631
632         def done(self):
633             return self.queue.successful_copies
634
635         def join(self):
636             # Start workers
637             for worker in self.workers:
638                 worker.start()
639             # Wait for finished work
640             self.queue.join()
641
642         def response(self):
643             return self.queue.response
644
645
646     class KeepWriterThread(threading.Thread):
647         TaskFailed = RuntimeError()
648
649         def __init__(self, queue, data, data_hash, timeout=None):
650             super(KeepClient.KeepWriterThread, self).__init__()
651             self.timeout = timeout
652             self.queue = queue
653             self.data = data
654             self.data_hash = data_hash
655             self.daemon = True
656
657         def run(self):
658             while True:
659                 try:
660                     service, service_root = self.queue.get_next_task()
661                 except queue.Empty:
662                     return
663                 try:
664                     locator, copies = self.do_task(service, service_root)
665                 except Exception as e:
666                     if e is not self.TaskFailed:
667                         _logger.exception("Exception in KeepWriterThread")
668                     self.queue.write_fail(service)
669                 else:
670                     self.queue.write_success(locator, copies)
671                 finally:
672                     self.queue.task_done()
673
674         def do_task(self, service, service_root):
675             success = bool(service.put(self.data_hash,
676                                         self.data,
677                                         timeout=self.timeout))
678             result = service.last_result()
679
680             if not success:
681                 if result.get('status_code', None):
682                     _logger.debug("Request fail: PUT %s => %s %s",
683                                   self.data_hash,
684                                   result['status_code'],
685                                   result['body'])
686                 raise self.TaskFailed
687
688             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
689                           str(threading.current_thread()),
690                           self.data_hash,
691                           len(self.data),
692                           service_root)
693             try:
694                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
695             except (KeyError, ValueError):
696                 replicas_stored = 1
697
698             return result['body'].strip(), replicas_stored
699
700
701     def __init__(self, api_client=None, proxy=None,
702                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
703                  api_token=None, local_store=None, block_cache=None,
704                  num_retries=0, session=None):
705         """Initialize a new KeepClient.
706
707         Arguments:
708         :api_client:
709           The API client to use to find Keep services.  If not
710           provided, KeepClient will build one from available Arvados
711           configuration.
712
713         :proxy:
714           If specified, this KeepClient will send requests to this Keep
715           proxy.  Otherwise, KeepClient will fall back to the setting of the
716           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
717           If you want to KeepClient does not use a proxy, pass in an empty
718           string.
719
720         :timeout:
721           The initial timeout (in seconds) for HTTP requests to Keep
722           non-proxy servers.  A tuple of three floats is interpreted as
723           (connection_timeout, read_timeout, minimum_bandwidth). A connection
724           will be aborted if the average traffic rate falls below
725           minimum_bandwidth bytes per second over an interval of read_timeout
726           seconds. Because timeouts are often a result of transient server
727           load, the actual connection timeout will be increased by a factor
728           of two on each retry.
729           Default: (2, 256, 32768).
730
731         :proxy_timeout:
732           The initial timeout (in seconds) for HTTP requests to
733           Keep proxies. A tuple of three floats is interpreted as
734           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
735           described above for adjusting connection timeouts on retry also
736           applies.
737           Default: (20, 256, 32768).
738
739         :api_token:
740           If you're not using an API client, but only talking
741           directly to a Keep proxy, this parameter specifies an API token
742           to authenticate Keep requests.  It is an error to specify both
743           api_client and api_token.  If you specify neither, KeepClient
744           will use one available from the Arvados configuration.
745
746         :local_store:
747           If specified, this KeepClient will bypass Keep
748           services, and save data to the named directory.  If unspecified,
749           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
750           environment variable.  If you want to ensure KeepClient does not
751           use local storage, pass in an empty string.  This is primarily
752           intended to mock a server for testing.
753
754         :num_retries:
755           The default number of times to retry failed requests.
756           This will be used as the default num_retries value when get() and
757           put() are called.  Default 0.
758         """
759         self.lock = threading.Lock()
760         if proxy is None:
761             if config.get('ARVADOS_KEEP_SERVICES'):
762                 proxy = config.get('ARVADOS_KEEP_SERVICES')
763             else:
764                 proxy = config.get('ARVADOS_KEEP_PROXY')
765         if api_token is None:
766             if api_client is None:
767                 api_token = config.get('ARVADOS_API_TOKEN')
768             else:
769                 api_token = api_client.api_token
770         elif api_client is not None:
771             raise ValueError(
772                 "can't build KeepClient with both API client and token")
773         if local_store is None:
774             local_store = os.environ.get('KEEP_LOCAL_STORE')
775
776         if api_client is None:
777             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
778         else:
779             self.insecure = api_client.insecure
780
781         self.block_cache = block_cache if block_cache else KeepBlockCache()
782         self.timeout = timeout
783         self.proxy_timeout = proxy_timeout
784         self._user_agent_pool = queue.LifoQueue()
785         self.upload_counter = Counter()
786         self.download_counter = Counter()
787         self.put_counter = Counter()
788         self.get_counter = Counter()
789         self.hits_counter = Counter()
790         self.misses_counter = Counter()
791
792         if local_store:
793             self.local_store = local_store
794             self.head = self.local_store_head
795             self.get = self.local_store_get
796             self.put = self.local_store_put
797         else:
798             self.num_retries = num_retries
799             self.max_replicas_per_service = None
800             if proxy:
801                 proxy_uris = proxy.split()
802                 for i in range(len(proxy_uris)):
803                     if not proxy_uris[i].endswith('/'):
804                         proxy_uris[i] += '/'
805                     # URL validation
806                     url = urllib.parse.urlparse(proxy_uris[i])
807                     if not (url.scheme and url.netloc):
808                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
809                 self.api_token = api_token
810                 self._gateway_services = {}
811                 self._keep_services = [{
812                     'uuid': "00000-bi6l4-%015d" % idx,
813                     'service_type': 'proxy',
814                     '_service_root': uri,
815                     } for idx, uri in enumerate(proxy_uris)]
816                 self._writable_services = self._keep_services
817                 self.using_proxy = True
818                 self._static_services_list = True
819             else:
820                 # It's important to avoid instantiating an API client
821                 # unless we actually need one, for testing's sake.
822                 if api_client is None:
823                     api_client = arvados.api('v1')
824                 self.api_client = api_client
825                 self.api_token = api_client.api_token
826                 self._gateway_services = {}
827                 self._keep_services = None
828                 self._writable_services = None
829                 self.using_proxy = None
830                 self._static_services_list = False
831
832     def current_timeout(self, attempt_number):
833         """Return the appropriate timeout to use for this client.
834
835         The proxy timeout setting if the backend service is currently a proxy,
836         the regular timeout setting otherwise.  The `attempt_number` indicates
837         how many times the operation has been tried already (starting from 0
838         for the first try), and scales the connection timeout portion of the
839         return value accordingly.
840
841         """
842         # TODO(twp): the timeout should be a property of a
843         # KeepService, not a KeepClient. See #4488.
844         t = self.proxy_timeout if self.using_proxy else self.timeout
845         if len(t) == 2:
846             return (t[0] * (1 << attempt_number), t[1])
847         else:
848             return (t[0] * (1 << attempt_number), t[1], t[2])
849     def _any_nondisk_services(self, service_list):
850         return any(ks.get('service_type', 'disk') != 'disk'
851                    for ks in service_list)
852
853     def build_services_list(self, force_rebuild=False):
854         if (self._static_services_list or
855               (self._keep_services and not force_rebuild)):
856             return
857         with self.lock:
858             try:
859                 keep_services = self.api_client.keep_services().accessible()
860             except Exception:  # API server predates Keep services.
861                 keep_services = self.api_client.keep_disks().list()
862
863             # Gateway services are only used when specified by UUID,
864             # so there's nothing to gain by filtering them by
865             # service_type.
866             self._gateway_services = {ks['uuid']: ks for ks in
867                                       keep_services.execute()['items']}
868             if not self._gateway_services:
869                 raise arvados.errors.NoKeepServersError()
870
871             # Precompute the base URI for each service.
872             for r in self._gateway_services.values():
873                 host = r['service_host']
874                 if not host.startswith('[') and host.find(':') >= 0:
875                     # IPv6 URIs must be formatted like http://[::1]:80/...
876                     host = '[' + host + ']'
877                 r['_service_root'] = "{}://{}:{:d}/".format(
878                     'https' if r['service_ssl_flag'] else 'http',
879                     host,
880                     r['service_port'])
881
882             _logger.debug(str(self._gateway_services))
883             self._keep_services = [
884                 ks for ks in self._gateway_services.values()
885                 if not ks.get('service_type', '').startswith('gateway:')]
886             self._writable_services = [ks for ks in self._keep_services
887                                        if not ks.get('read_only')]
888
889             # For disk type services, max_replicas_per_service is 1
890             # It is unknown (unlimited) for other service types.
891             if self._any_nondisk_services(self._writable_services):
892                 self.max_replicas_per_service = None
893             else:
894                 self.max_replicas_per_service = 1
895
896     def _service_weight(self, data_hash, service_uuid):
897         """Compute the weight of a Keep service endpoint for a data
898         block with a known hash.
899
900         The weight is md5(h + u) where u is the last 15 characters of
901         the service endpoint's UUID.
902         """
903         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
904
905     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
906         """Return an array of Keep service endpoints, in the order in
907         which they should be probed when reading or writing data with
908         the given hash+hints.
909         """
910         self.build_services_list(force_rebuild)
911
912         sorted_roots = []
913         # Use the services indicated by the given +K@... remote
914         # service hints, if any are present and can be resolved to a
915         # URI.
916         for hint in locator.hints:
917             if hint.startswith('K@'):
918                 if len(hint) == 7:
919                     sorted_roots.append(
920                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
921                 elif len(hint) == 29:
922                     svc = self._gateway_services.get(hint[2:])
923                     if svc:
924                         sorted_roots.append(svc['_service_root'])
925
926         # Sort the available local services by weight (heaviest first)
927         # for this locator, and return their service_roots (base URIs)
928         # in that order.
929         use_services = self._keep_services
930         if need_writable:
931             use_services = self._writable_services
932         self.using_proxy = self._any_nondisk_services(use_services)
933         sorted_roots.extend([
934             svc['_service_root'] for svc in sorted(
935                 use_services,
936                 reverse=True,
937                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
938         _logger.debug("{}: {}".format(locator, sorted_roots))
939         return sorted_roots
940
941     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
942         # roots_map is a dictionary, mapping Keep service root strings
943         # to KeepService objects.  Poll for Keep services, and add any
944         # new ones to roots_map.  Return the current list of local
945         # root strings.
946         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
947         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
948         for root in local_roots:
949             if root not in roots_map:
950                 roots_map[root] = self.KeepService(
951                     root, self._user_agent_pool,
952                     upload_counter=self.upload_counter,
953                     download_counter=self.download_counter,
954                     headers=headers,
955                     insecure=self.insecure)
956         return local_roots
957
958     @staticmethod
959     def _check_loop_result(result):
960         # KeepClient RetryLoops should save results as a 2-tuple: the
961         # actual result of the request, and the number of servers available
962         # to receive the request this round.
963         # This method returns True if there's a real result, False if
964         # there are no more servers available, otherwise None.
965         if isinstance(result, Exception):
966             return None
967         result, tried_server_count = result
968         if (result is not None) and (result is not False):
969             return True
970         elif tried_server_count < 1:
971             _logger.info("No more Keep services to try; giving up")
972             return False
973         else:
974             return None
975
976     def get_from_cache(self, loc):
977         """Fetch a block only if is in the cache, otherwise return None."""
978         slot = self.block_cache.get(loc)
979         if slot is not None and slot.ready.is_set():
980             return slot.get()
981         else:
982             return None
983
984     def refresh_signature(self, loc):
985         """Ask Keep to get the remote block and return its local signature"""
986         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
987         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
988
989     @retry.retry_method
990     def head(self, loc_s, **kwargs):
991         return self._get_or_head(loc_s, method="HEAD", **kwargs)
992
993     @retry.retry_method
994     def get(self, loc_s, **kwargs):
995         return self._get_or_head(loc_s, method="GET", **kwargs)
996
997     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
998         """Get data from Keep.
999
1000         This method fetches one or more blocks of data from Keep.  It
1001         sends a request each Keep service registered with the API
1002         server (or the proxy provided when this client was
1003         instantiated), then each service named in location hints, in
1004         sequence.  As soon as one service provides the data, it's
1005         returned.
1006
1007         Arguments:
1008         * loc_s: A string of one or more comma-separated locators to fetch.
1009           This method returns the concatenation of these blocks.
1010         * num_retries: The number of times to retry GET requests to
1011           *each* Keep server if it returns temporary failures, with
1012           exponential backoff.  Note that, in each loop, the method may try
1013           to fetch data from every available Keep service, along with any
1014           that are named in location hints in the locator.  The default value
1015           is set when the KeepClient is initialized.
1016         """
1017         if ',' in loc_s:
1018             return ''.join(self.get(x) for x in loc_s.split(','))
1019
1020         self.get_counter.add(1)
1021
1022         slot = None
1023         blob = None
1024         try:
1025             locator = KeepLocator(loc_s)
1026             if method == "GET":
1027                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1028                 if not first:
1029                     self.hits_counter.add(1)
1030                     blob = slot.get()
1031                     if blob is None:
1032                         raise arvados.errors.KeepReadError(
1033                             "failed to read {}".format(loc_s))
1034                     return blob
1035
1036             self.misses_counter.add(1)
1037
1038             if headers is None:
1039                 headers = {}
1040             headers['X-Request-Id'] = (request_id or
1041                                         (hasattr(self, 'api_client') and self.api_client.request_id) or
1042                                         arvados.util.new_request_id())
1043
1044             # If the locator has hints specifying a prefix (indicating a
1045             # remote keepproxy) or the UUID of a local gateway service,
1046             # read data from the indicated service(s) instead of the usual
1047             # list of local disk services.
1048             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1049                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1050             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1051                                for hint in locator.hints if (
1052                                        hint.startswith('K@') and
1053                                        len(hint) == 29 and
1054                                        self._gateway_services.get(hint[2:])
1055                                        )])
1056             # Map root URLs to their KeepService objects.
1057             roots_map = {
1058                 root: self.KeepService(root, self._user_agent_pool,
1059                                        upload_counter=self.upload_counter,
1060                                        download_counter=self.download_counter,
1061                                        headers=headers,
1062                                        insecure=self.insecure)
1063                 for root in hint_roots
1064             }
1065
1066             # See #3147 for a discussion of the loop implementation.  Highlights:
1067             # * Refresh the list of Keep services after each failure, in case
1068             #   it's being updated.
1069             # * Retry until we succeed, we're out of retries, or every available
1070             #   service has returned permanent failure.
1071             sorted_roots = []
1072             roots_map = {}
1073             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1074                                    backoff_start=2)
1075             for tries_left in loop:
1076                 try:
1077                     sorted_roots = self.map_new_services(
1078                         roots_map, locator,
1079                         force_rebuild=(tries_left < num_retries),
1080                         need_writable=False,
1081                         headers=headers)
1082                 except Exception as error:
1083                     loop.save_result(error)
1084                     continue
1085
1086                 # Query KeepService objects that haven't returned
1087                 # permanent failure, in our specified shuffle order.
1088                 services_to_try = [roots_map[root]
1089                                    for root in sorted_roots
1090                                    if roots_map[root].usable()]
1091                 for keep_service in services_to_try:
1092                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1093                     if blob is not None:
1094                         break
1095                 loop.save_result((blob, len(services_to_try)))
1096
1097             # Always cache the result, then return it if we succeeded.
1098             if loop.success():
1099                 return blob
1100         finally:
1101             if slot is not None:
1102                 slot.set(blob)
1103                 self.block_cache.cap_cache()
1104
1105         # Q: Including 403 is necessary for the Keep tests to continue
1106         # passing, but maybe they should expect KeepReadError instead?
1107         not_founds = sum(1 for key in sorted_roots
1108                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1109         service_errors = ((key, roots_map[key].last_result()['error'])
1110                           for key in sorted_roots)
1111         if not roots_map:
1112             raise arvados.errors.KeepReadError(
1113                 "failed to read {}: no Keep services available ({})".format(
1114                     loc_s, loop.last_result()))
1115         elif not_founds == len(sorted_roots):
1116             raise arvados.errors.NotFoundError(
1117                 "{} not found".format(loc_s), service_errors)
1118         else:
1119             raise arvados.errors.KeepReadError(
1120                 "failed to read {}".format(loc_s), service_errors, label="service")
1121
1122     @retry.retry_method
1123     def put(self, data, copies=2, num_retries=None, request_id=None):
1124         """Save data in Keep.
1125
1126         This method will get a list of Keep services from the API server, and
1127         send the data to each one simultaneously in a new thread.  Once the
1128         uploads are finished, if enough copies are saved, this method returns
1129         the most recent HTTP response body.  If requests fail to upload
1130         enough copies, this method raises KeepWriteError.
1131
1132         Arguments:
1133         * data: The string of data to upload.
1134         * copies: The number of copies that the user requires be saved.
1135           Default 2.
1136         * num_retries: The number of times to retry PUT requests to
1137           *each* Keep server if it returns temporary failures, with
1138           exponential backoff.  The default value is set when the
1139           KeepClient is initialized.
1140         """
1141
1142         if not isinstance(data, bytes):
1143             data = data.encode()
1144
1145         self.put_counter.add(1)
1146
1147         data_hash = hashlib.md5(data).hexdigest()
1148         loc_s = data_hash + '+' + str(len(data))
1149         if copies < 1:
1150             return loc_s
1151         locator = KeepLocator(loc_s)
1152
1153         headers = {
1154             'X-Request-Id': (request_id or
1155                              (hasattr(self, 'api_client') and self.api_client.request_id) or
1156                              arvados.util.new_request_id()),
1157             'X-Keep-Desired-Replicas': str(copies),
1158         }
1159         roots_map = {}
1160         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1161                                backoff_start=2)
1162         done = 0
1163         for tries_left in loop:
1164             try:
1165                 sorted_roots = self.map_new_services(
1166                     roots_map, locator,
1167                     force_rebuild=(tries_left < num_retries),
1168                     need_writable=True,
1169                     headers=headers)
1170             except Exception as error:
1171                 loop.save_result(error)
1172                 continue
1173
1174             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1175                                                         data_hash=data_hash,
1176                                                         copies=copies - done,
1177                                                         max_service_replicas=self.max_replicas_per_service,
1178                                                         timeout=self.current_timeout(num_retries - tries_left))
1179             for service_root, ks in [(root, roots_map[root])
1180                                      for root in sorted_roots]:
1181                 if ks.finished():
1182                     continue
1183                 writer_pool.add_task(ks, service_root)
1184             writer_pool.join()
1185             done += writer_pool.done()
1186             loop.save_result((done >= copies, writer_pool.total_task_nr))
1187
1188         if loop.success():
1189             return writer_pool.response()
1190         if not roots_map:
1191             raise arvados.errors.KeepWriteError(
1192                 "failed to write {}: no Keep services available ({})".format(
1193                     data_hash, loop.last_result()))
1194         else:
1195             service_errors = ((key, roots_map[key].last_result()['error'])
1196                               for key in sorted_roots
1197                               if roots_map[key].last_result()['error'])
1198             raise arvados.errors.KeepWriteError(
1199                 "failed to write {} (wanted {} copies but wrote {})".format(
1200                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1201
1202     def local_store_put(self, data, copies=1, num_retries=None):
1203         """A stub for put().
1204
1205         This method is used in place of the real put() method when
1206         using local storage (see constructor's local_store argument).
1207
1208         copies and num_retries arguments are ignored: they are here
1209         only for the sake of offering the same call signature as
1210         put().
1211
1212         Data stored this way can be retrieved via local_store_get().
1213         """
1214         md5 = hashlib.md5(data).hexdigest()
1215         locator = '%s+%d' % (md5, len(data))
1216         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1217             f.write(data)
1218         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1219                   os.path.join(self.local_store, md5))
1220         return locator
1221
1222     def local_store_get(self, loc_s, num_retries=None):
1223         """Companion to local_store_put()."""
1224         try:
1225             locator = KeepLocator(loc_s)
1226         except ValueError:
1227             raise arvados.errors.NotFoundError(
1228                 "Invalid data locator: '%s'" % loc_s)
1229         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1230             return b''
1231         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1232             return f.read()
1233
1234     def local_store_head(self, loc_s, num_retries=None):
1235         """Companion to local_store_put()."""
1236         try:
1237             locator = KeepLocator(loc_s)
1238         except ValueError:
1239             raise arvados.errors.NotFoundError(
1240                 "Invalid data locator: '%s'" % loc_s)
1241         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1242             return True
1243         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1244             return True
1245
1246     def is_cached(self, locator):
1247         return self.block_cache.reserve_cache(expect_hash)