11308: Avoid Python2-inefficient list() operations.
[arvados.git] / sdk / python / arvados / keep.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import next
6 from builtins import str
7 from builtins import range
8 from builtins import object
9 import io
10 import datetime
11 import hashlib
12 import logging
13 import math
14 import os
15 import pycurl
16 import queue
17 import re
18 import socket
19 import ssl
20 import sys
21 import threading
22 from . import timer
23 import urllib.parse
24
25 if sys.version_info >= (3, 0):
26     from io import BytesIO
27 else:
28     from cStringIO import StringIO as BytesIO
29
30 import arvados
31 import arvados.config as config
32 import arvados.errors
33 import arvados.retry as retry
34 import arvados.util
35
36 _logger = logging.getLogger('arvados.keep')
37 global_client_object = None
38
39
40 # Monkey patch TCP constants when not available (apple). Values sourced from:
41 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
42 if sys.platform == 'darwin':
43     if not hasattr(socket, 'TCP_KEEPALIVE'):
44         socket.TCP_KEEPALIVE = 0x010
45     if not hasattr(socket, 'TCP_KEEPINTVL'):
46         socket.TCP_KEEPINTVL = 0x101
47     if not hasattr(socket, 'TCP_KEEPCNT'):
48         socket.TCP_KEEPCNT = 0x102
49
50
51 class KeepLocator(object):
52     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
53     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
54
55     def __init__(self, locator_str):
56         self.hints = []
57         self._perm_sig = None
58         self._perm_expiry = None
59         pieces = iter(locator_str.split('+'))
60         self.md5sum = next(pieces)
61         try:
62             self.size = int(next(pieces))
63         except StopIteration:
64             self.size = None
65         for hint in pieces:
66             if self.HINT_RE.match(hint) is None:
67                 raise ValueError("invalid hint format: {}".format(hint))
68             elif hint.startswith('A'):
69                 self.parse_permission_hint(hint)
70             else:
71                 self.hints.append(hint)
72
73     def __str__(self):
74         return '+'.join(
75             str(s) for s in [self.md5sum, self.size,
76                              self.permission_hint()] + self.hints
77             if s is not None)
78
79     def stripped(self):
80         if self.size is not None:
81             return "%s+%i" % (self.md5sum, self.size)
82         else:
83             return self.md5sum
84
85     def _make_hex_prop(name, length):
86         # Build and return a new property with the given name that
87         # must be a hex string of the given length.
88         data_name = '_{}'.format(name)
89         def getter(self):
90             return getattr(self, data_name)
91         def setter(self, hex_str):
92             if not arvados.util.is_hex(hex_str, length):
93                 raise ValueError("{} is not a {}-digit hex string: {!r}".
94                                  format(name, length, hex_str))
95             setattr(self, data_name, hex_str)
96         return property(getter, setter)
97
98     md5sum = _make_hex_prop('md5sum', 32)
99     perm_sig = _make_hex_prop('perm_sig', 40)
100
101     @property
102     def perm_expiry(self):
103         return self._perm_expiry
104
105     @perm_expiry.setter
106     def perm_expiry(self, value):
107         if not arvados.util.is_hex(value, 1, 8):
108             raise ValueError(
109                 "permission timestamp must be a hex Unix timestamp: {}".
110                 format(value))
111         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
112
113     def permission_hint(self):
114         data = [self.perm_sig, self.perm_expiry]
115         if None in data:
116             return None
117         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
118         return "A{}@{:08x}".format(*data)
119
120     def parse_permission_hint(self, s):
121         try:
122             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
123         except IndexError:
124             raise ValueError("bad permission hint {}".format(s))
125
126     def permission_expired(self, as_of_dt=None):
127         if self.perm_expiry is None:
128             return False
129         elif as_of_dt is None:
130             as_of_dt = datetime.datetime.now()
131         return self.perm_expiry <= as_of_dt
132
133
134 class Keep(object):
135     """Simple interface to a global KeepClient object.
136
137     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
138     own API client.  The global KeepClient will build an API client from the
139     current Arvados configuration, which may not match the one you built.
140     """
141     _last_key = None
142
143     @classmethod
144     def global_client_object(cls):
145         global global_client_object
146         # Previously, KeepClient would change its behavior at runtime based
147         # on these configuration settings.  We simulate that behavior here
148         # by checking the values and returning a new KeepClient if any of
149         # them have changed.
150         key = (config.get('ARVADOS_API_HOST'),
151                config.get('ARVADOS_API_TOKEN'),
152                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
153                config.get('ARVADOS_KEEP_PROXY'),
154                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
155                os.environ.get('KEEP_LOCAL_STORE'))
156         if (global_client_object is None) or (cls._last_key != key):
157             global_client_object = KeepClient()
158             cls._last_key = key
159         return global_client_object
160
161     @staticmethod
162     def get(locator, **kwargs):
163         return Keep.global_client_object().get(locator, **kwargs)
164
165     @staticmethod
166     def put(data, **kwargs):
167         return Keep.global_client_object().put(data, **kwargs)
168
169 class KeepBlockCache(object):
170     # Default RAM cache is 256MiB
171     def __init__(self, cache_max=(256 * 1024 * 1024)):
172         self.cache_max = cache_max
173         self._cache = []
174         self._cache_lock = threading.Lock()
175
176     class CacheSlot(object):
177         __slots__ = ("locator", "ready", "content")
178
179         def __init__(self, locator):
180             self.locator = locator
181             self.ready = threading.Event()
182             self.content = None
183
184         def get(self):
185             self.ready.wait()
186             return self.content
187
188         def set(self, value):
189             self.content = value
190             self.ready.set()
191
192         def size(self):
193             if self.content is None:
194                 return 0
195             else:
196                 return len(self.content)
197
198     def cap_cache(self):
199         '''Cap the cache size to self.cache_max'''
200         with self._cache_lock:
201             # Select all slots except those where ready.is_set() and content is
202             # None (that means there was an error reading the block).
203             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
204             sm = sum([slot.size() for slot in self._cache])
205             while len(self._cache) > 0 and sm > self.cache_max:
206                 for i in range(len(self._cache)-1, -1, -1):
207                     if self._cache[i].ready.is_set():
208                         del self._cache[i]
209                         break
210                 sm = sum([slot.size() for slot in self._cache])
211
212     def _get(self, locator):
213         # Test if the locator is already in the cache
214         for i in range(0, len(self._cache)):
215             if self._cache[i].locator == locator:
216                 n = self._cache[i]
217                 if i != 0:
218                     # move it to the front
219                     del self._cache[i]
220                     self._cache.insert(0, n)
221                 return n
222         return None
223
224     def get(self, locator):
225         with self._cache_lock:
226             return self._get(locator)
227
228     def reserve_cache(self, locator):
229         '''Reserve a cache slot for the specified locator,
230         or return the existing slot.'''
231         with self._cache_lock:
232             n = self._get(locator)
233             if n:
234                 return n, False
235             else:
236                 # Add a new cache slot for the locator
237                 n = KeepBlockCache.CacheSlot(locator)
238                 self._cache.insert(0, n)
239                 return n, True
240
241 class Counter(object):
242     def __init__(self, v=0):
243         self._lk = threading.Lock()
244         self._val = v
245
246     def add(self, v):
247         with self._lk:
248             self._val += v
249
250     def get(self):
251         with self._lk:
252             return self._val
253
254
255 class KeepClient(object):
256
257     # Default Keep server connection timeout:  2 seconds
258     # Default Keep server read timeout:       256 seconds
259     # Default Keep server bandwidth minimum:  32768 bytes per second
260     # Default Keep proxy connection timeout:  20 seconds
261     # Default Keep proxy read timeout:        256 seconds
262     # Default Keep proxy bandwidth minimum:   32768 bytes per second
263     DEFAULT_TIMEOUT = (2, 256, 32768)
264     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
265
266
267     class KeepService(object):
268         """Make requests to a single Keep service, and track results.
269
270         A KeepService is intended to last long enough to perform one
271         transaction (GET or PUT) against one Keep service. This can
272         involve calling either get() or put() multiple times in order
273         to retry after transient failures. However, calling both get()
274         and put() on a single instance -- or using the same instance
275         to access two different Keep services -- will not produce
276         sensible behavior.
277         """
278
279         HTTP_ERRORS = (
280             socket.error,
281             ssl.SSLError,
282             arvados.errors.HttpError,
283         )
284
285         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
286                      upload_counter=None,
287                      download_counter=None, **headers):
288             self.root = root
289             self._user_agent_pool = user_agent_pool
290             self._result = {'error': None}
291             self._usable = True
292             self._session = None
293             self._socket = None
294             self.get_headers = {'Accept': 'application/octet-stream'}
295             self.get_headers.update(headers)
296             self.put_headers = headers
297             self.upload_counter = upload_counter
298             self.download_counter = download_counter
299
300         def usable(self):
301             """Is it worth attempting a request?"""
302             return self._usable
303
304         def finished(self):
305             """Did the request succeed or encounter permanent failure?"""
306             return self._result['error'] == False or not self._usable
307
308         def last_result(self):
309             return self._result
310
311         def _get_user_agent(self):
312             try:
313                 return self._user_agent_pool.get(block=False)
314             except queue.Empty:
315                 return pycurl.Curl()
316
317         def _put_user_agent(self, ua):
318             try:
319                 ua.reset()
320                 self._user_agent_pool.put(ua, block=False)
321             except:
322                 ua.close()
323
324         def _socket_open(self, family, socktype, protocol, address=None):
325             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
326             s = socket.socket(family, socktype, protocol)
327             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
328             # Will throw invalid protocol error on mac. This test prevents that.
329             if hasattr(socket, 'TCP_KEEPIDLE'):
330                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
331             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
332             self._socket = s
333             return s
334
335         def get(self, locator, method="GET", timeout=None):
336             # locator is a KeepLocator object.
337             url = self.root + str(locator)
338             _logger.debug("Request: %s %s", method, url)
339             curl = self._get_user_agent()
340             ok = None
341             try:
342                 with timer.Timer() as t:
343                     self._headers = {}
344                     response_body = BytesIO()
345                     curl.setopt(pycurl.NOSIGNAL, 1)
346                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
347                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
348                     curl.setopt(pycurl.URL, url.encode('utf-8'))
349                     curl.setopt(pycurl.HTTPHEADER, [
350                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
351                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
352                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
353                     if method == "HEAD":
354                         curl.setopt(pycurl.NOBODY, True)
355                     self._setcurltimeouts(curl, timeout)
356
357                     try:
358                         curl.perform()
359                     except Exception as e:
360                         raise arvados.errors.HttpError(0, str(e))
361                     finally:
362                         if self._socket:
363                             self._socket.close()
364                             self._socket = None
365                     self._result = {
366                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
367                         'body': response_body.getvalue(),
368                         'headers': self._headers,
369                         'error': False,
370                     }
371
372                 ok = retry.check_http_response_success(self._result['status_code'])
373                 if not ok:
374                     self._result['error'] = arvados.errors.HttpError(
375                         self._result['status_code'],
376                         self._headers.get('x-status-line', 'Error'))
377             except self.HTTP_ERRORS as e:
378                 self._result = {
379                     'error': e,
380                 }
381             self._usable = ok != False
382             if self._result.get('status_code', None):
383                 # The client worked well enough to get an HTTP status
384                 # code, so presumably any problems are just on the
385                 # server side and it's OK to reuse the client.
386                 self._put_user_agent(curl)
387             else:
388                 # Don't return this client to the pool, in case it's
389                 # broken.
390                 curl.close()
391             if not ok:
392                 _logger.debug("Request fail: GET %s => %s: %s",
393                               url, type(self._result['error']), str(self._result['error']))
394                 return None
395             if method == "HEAD":
396                 _logger.info("HEAD %s: %s bytes",
397                          self._result['status_code'],
398                          self._result.get('content-length'))
399                 return True
400
401             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
402                          self._result['status_code'],
403                          len(self._result['body']),
404                          t.msecs,
405                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
406
407             if self.download_counter:
408                 self.download_counter.add(len(self._result['body']))
409             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
410             if resp_md5 != locator.md5sum:
411                 _logger.warning("Checksum fail: md5(%s) = %s",
412                                 url, resp_md5)
413                 self._result['error'] = arvados.errors.HttpError(
414                     0, 'Checksum fail')
415                 return None
416             return self._result['body']
417
418         def put(self, hash_s, body, timeout=None):
419             url = self.root + hash_s
420             _logger.debug("Request: PUT %s", url)
421             curl = self._get_user_agent()
422             ok = None
423             try:
424                 with timer.Timer() as t:
425                     self._headers = {}
426                     body_reader = BytesIO(body)
427                     response_body = BytesIO()
428                     curl.setopt(pycurl.NOSIGNAL, 1)
429                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
430                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
431                     curl.setopt(pycurl.URL, url.encode('utf-8'))
432                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
433                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
434                     # response) instead of sending the request body immediately.
435                     # This allows the server to reject the request if the request
436                     # is invalid or the server is read-only, without waiting for
437                     # the client to send the entire block.
438                     curl.setopt(pycurl.UPLOAD, True)
439                     curl.setopt(pycurl.INFILESIZE, len(body))
440                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
441                     curl.setopt(pycurl.HTTPHEADER, [
442                         '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
443                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
444                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
445                     self._setcurltimeouts(curl, timeout)
446                     try:
447                         curl.perform()
448                     except Exception as e:
449                         raise arvados.errors.HttpError(0, str(e))
450                     finally:
451                         if self._socket:
452                             self._socket.close()
453                             self._socket = None
454                     self._result = {
455                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
456                         'body': response_body.getvalue().decode('utf-8'),
457                         'headers': self._headers,
458                         'error': False,
459                     }
460                 ok = retry.check_http_response_success(self._result['status_code'])
461                 if not ok:
462                     self._result['error'] = arvados.errors.HttpError(
463                         self._result['status_code'],
464                         self._headers.get('x-status-line', 'Error'))
465             except self.HTTP_ERRORS as e:
466                 self._result = {
467                     'error': e,
468                 }
469             self._usable = ok != False # still usable if ok is True or None
470             if self._result.get('status_code', None):
471                 # Client is functional. See comment in get().
472                 self._put_user_agent(curl)
473             else:
474                 curl.close()
475             if not ok:
476                 _logger.debug("Request fail: PUT %s => %s: %s",
477                               url, type(self._result['error']), str(self._result['error']))
478                 return False
479             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
480                          self._result['status_code'],
481                          len(body),
482                          t.msecs,
483                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
484             if self.upload_counter:
485                 self.upload_counter.add(len(body))
486             return True
487
488         def _setcurltimeouts(self, curl, timeouts):
489             if not timeouts:
490                 return
491             elif isinstance(timeouts, tuple):
492                 if len(timeouts) == 2:
493                     conn_t, xfer_t = timeouts
494                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
495                 else:
496                     conn_t, xfer_t, bandwidth_bps = timeouts
497             else:
498                 conn_t, xfer_t = (timeouts, timeouts)
499                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
500             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
501             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
502             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
503
504         def _headerfunction(self, header_line):
505             if isinstance(header_line, bytes):
506                 header_line = header_line.decode('iso-8859-1')
507             if ':' in header_line:
508                 name, value = header_line.split(':', 1)
509                 name = name.strip().lower()
510                 value = value.strip()
511             elif self._headers:
512                 name = self._lastheadername
513                 value = self._headers[name] + ' ' + header_line.strip()
514             elif header_line.startswith('HTTP/'):
515                 name = 'x-status-line'
516                 value = header_line
517             else:
518                 _logger.error("Unexpected header line: %s", header_line)
519                 return
520             self._lastheadername = name
521             self._headers[name] = value
522             # Returning None implies all bytes were written
523     
524
525     class KeepWriterQueue(queue.Queue):
526         def __init__(self, copies):
527             queue.Queue.__init__(self) # Old-style superclass
528             self.wanted_copies = copies
529             self.successful_copies = 0
530             self.response = None
531             self.successful_copies_lock = threading.Lock()
532             self.pending_tries = copies
533             self.pending_tries_notification = threading.Condition()
534         
535         def write_success(self, response, replicas_nr):
536             with self.successful_copies_lock:
537                 self.successful_copies += replicas_nr
538                 self.response = response
539             with self.pending_tries_notification:
540                 self.pending_tries_notification.notify_all()
541         
542         def write_fail(self, ks):
543             with self.pending_tries_notification:
544                 self.pending_tries += 1
545                 self.pending_tries_notification.notify()
546         
547         def pending_copies(self):
548             with self.successful_copies_lock:
549                 return self.wanted_copies - self.successful_copies
550
551         def get_next_task(self):
552             with self.pending_tries_notification:
553                 while True:
554                     if self.pending_copies() < 1:
555                         # This notify_all() is unnecessary --
556                         # write_success() already called notify_all()
557                         # when pending<1 became true, so it's not
558                         # possible for any other thread to be in
559                         # wait() now -- but it's cheap insurance
560                         # against deadlock so we do it anyway:
561                         self.pending_tries_notification.notify_all()
562                         # Drain the queue and then raise Queue.Empty
563                         while True:
564                             self.get_nowait()
565                             self.task_done()
566                     elif self.pending_tries > 0:
567                         service, service_root = self.get_nowait()
568                         if service.finished():
569                             self.task_done()
570                             continue
571                         self.pending_tries -= 1
572                         return service, service_root
573                     elif self.empty():
574                         self.pending_tries_notification.notify_all()
575                         raise queue.Empty
576                     else:
577                         self.pending_tries_notification.wait()
578
579
580     class KeepWriterThreadPool(object):
581         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
582             self.total_task_nr = 0
583             self.wanted_copies = copies
584             if (not max_service_replicas) or (max_service_replicas >= copies):
585                 num_threads = 1
586             else:
587                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
588             _logger.debug("Pool max threads is %d", num_threads)
589             self.workers = []
590             self.queue = KeepClient.KeepWriterQueue(copies)
591             # Create workers
592             for _ in range(num_threads):
593                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
594                 self.workers.append(w)
595         
596         def add_task(self, ks, service_root):
597             self.queue.put((ks, service_root))
598             self.total_task_nr += 1
599         
600         def done(self):
601             return self.queue.successful_copies
602         
603         def join(self):
604             # Start workers
605             for worker in self.workers:
606                 worker.start()
607             # Wait for finished work
608             self.queue.join()
609         
610         def response(self):
611             return self.queue.response
612     
613     
614     class KeepWriterThread(threading.Thread):
615         TaskFailed = RuntimeError()
616
617         def __init__(self, queue, data, data_hash, timeout=None):
618             super(KeepClient.KeepWriterThread, self).__init__()
619             self.timeout = timeout
620             self.queue = queue
621             self.data = data
622             self.data_hash = data_hash
623             self.daemon = True
624
625         def run(self):
626             while True:
627                 try:
628                     service, service_root = self.queue.get_next_task()
629                 except queue.Empty:
630                     return
631                 try:
632                     locator, copies = self.do_task(service, service_root)
633                 except Exception as e:
634                     if e is not self.TaskFailed:
635                         _logger.exception("Exception in KeepWriterThread")
636                     self.queue.write_fail(service)
637                 else:
638                     self.queue.write_success(locator, copies)
639                 finally:
640                     self.queue.task_done()
641
642         def do_task(self, service, service_root):
643             success = bool(service.put(self.data_hash,
644                                         self.data,
645                                         timeout=self.timeout))
646             result = service.last_result()
647
648             if not success:
649                 if result.get('status_code', None):
650                     _logger.debug("Request fail: PUT %s => %s %s",
651                                   self.data_hash,
652                                   result['status_code'],
653                                   result['body'])
654                 raise self.TaskFailed
655
656             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
657                           str(threading.current_thread()),
658                           self.data_hash,
659                           len(self.data),
660                           service_root)
661             try:
662                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
663             except (KeyError, ValueError):
664                 replicas_stored = 1
665
666             return result['body'].strip(), replicas_stored
667
668
669     def __init__(self, api_client=None, proxy=None,
670                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
671                  api_token=None, local_store=None, block_cache=None,
672                  num_retries=0, session=None):
673         """Initialize a new KeepClient.
674
675         Arguments:
676         :api_client:
677           The API client to use to find Keep services.  If not
678           provided, KeepClient will build one from available Arvados
679           configuration.
680
681         :proxy:
682           If specified, this KeepClient will send requests to this Keep
683           proxy.  Otherwise, KeepClient will fall back to the setting of the
684           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
685           If you want to KeepClient does not use a proxy, pass in an empty
686           string.
687
688         :timeout:
689           The initial timeout (in seconds) for HTTP requests to Keep
690           non-proxy servers.  A tuple of three floats is interpreted as
691           (connection_timeout, read_timeout, minimum_bandwidth). A connection
692           will be aborted if the average traffic rate falls below
693           minimum_bandwidth bytes per second over an interval of read_timeout
694           seconds. Because timeouts are often a result of transient server
695           load, the actual connection timeout will be increased by a factor
696           of two on each retry.
697           Default: (2, 256, 32768).
698
699         :proxy_timeout:
700           The initial timeout (in seconds) for HTTP requests to
701           Keep proxies. A tuple of three floats is interpreted as
702           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
703           described above for adjusting connection timeouts on retry also
704           applies.
705           Default: (20, 256, 32768).
706
707         :api_token:
708           If you're not using an API client, but only talking
709           directly to a Keep proxy, this parameter specifies an API token
710           to authenticate Keep requests.  It is an error to specify both
711           api_client and api_token.  If you specify neither, KeepClient
712           will use one available from the Arvados configuration.
713
714         :local_store:
715           If specified, this KeepClient will bypass Keep
716           services, and save data to the named directory.  If unspecified,
717           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
718           environment variable.  If you want to ensure KeepClient does not
719           use local storage, pass in an empty string.  This is primarily
720           intended to mock a server for testing.
721
722         :num_retries:
723           The default number of times to retry failed requests.
724           This will be used as the default num_retries value when get() and
725           put() are called.  Default 0.
726         """
727         self.lock = threading.Lock()
728         if proxy is None:
729             if config.get('ARVADOS_KEEP_SERVICES'):
730                 proxy = config.get('ARVADOS_KEEP_SERVICES')
731             else:
732                 proxy = config.get('ARVADOS_KEEP_PROXY')
733         if api_token is None:
734             if api_client is None:
735                 api_token = config.get('ARVADOS_API_TOKEN')
736             else:
737                 api_token = api_client.api_token
738         elif api_client is not None:
739             raise ValueError(
740                 "can't build KeepClient with both API client and token")
741         if local_store is None:
742             local_store = os.environ.get('KEEP_LOCAL_STORE')
743
744         self.block_cache = block_cache if block_cache else KeepBlockCache()
745         self.timeout = timeout
746         self.proxy_timeout = proxy_timeout
747         self._user_agent_pool = queue.LifoQueue()
748         self.upload_counter = Counter()
749         self.download_counter = Counter()
750         self.put_counter = Counter()
751         self.get_counter = Counter()
752         self.hits_counter = Counter()
753         self.misses_counter = Counter()
754
755         if local_store:
756             self.local_store = local_store
757             self.get = self.local_store_get
758             self.put = self.local_store_put
759         else:
760             self.num_retries = num_retries
761             self.max_replicas_per_service = None
762             if proxy:
763                 proxy_uris = proxy.split()
764                 for i in range(len(proxy_uris)):
765                     if not proxy_uris[i].endswith('/'):
766                         proxy_uris[i] += '/'
767                     # URL validation
768                     url = urllib.parse.urlparse(proxy_uris[i])
769                     if not (url.scheme and url.netloc):
770                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
771                 self.api_token = api_token
772                 self._gateway_services = {}
773                 self._keep_services = [{
774                     'uuid': "00000-bi6l4-%015d" % idx,
775                     'service_type': 'proxy',
776                     '_service_root': uri,
777                     } for idx, uri in enumerate(proxy_uris)]
778                 self._writable_services = self._keep_services
779                 self.using_proxy = True
780                 self._static_services_list = True
781             else:
782                 # It's important to avoid instantiating an API client
783                 # unless we actually need one, for testing's sake.
784                 if api_client is None:
785                     api_client = arvados.api('v1')
786                 self.api_client = api_client
787                 self.api_token = api_client.api_token
788                 self._gateway_services = {}
789                 self._keep_services = None
790                 self._writable_services = None
791                 self.using_proxy = None
792                 self._static_services_list = False
793
794     def current_timeout(self, attempt_number):
795         """Return the appropriate timeout to use for this client.
796
797         The proxy timeout setting if the backend service is currently a proxy,
798         the regular timeout setting otherwise.  The `attempt_number` indicates
799         how many times the operation has been tried already (starting from 0
800         for the first try), and scales the connection timeout portion of the
801         return value accordingly.
802
803         """
804         # TODO(twp): the timeout should be a property of a
805         # KeepService, not a KeepClient. See #4488.
806         t = self.proxy_timeout if self.using_proxy else self.timeout
807         if len(t) == 2:
808             return (t[0] * (1 << attempt_number), t[1])
809         else:
810             return (t[0] * (1 << attempt_number), t[1], t[2])
811     def _any_nondisk_services(self, service_list):
812         return any(ks.get('service_type', 'disk') != 'disk'
813                    for ks in service_list)
814
815     def build_services_list(self, force_rebuild=False):
816         if (self._static_services_list or
817               (self._keep_services and not force_rebuild)):
818             return
819         with self.lock:
820             try:
821                 keep_services = self.api_client.keep_services().accessible()
822             except Exception:  # API server predates Keep services.
823                 keep_services = self.api_client.keep_disks().list()
824
825             # Gateway services are only used when specified by UUID,
826             # so there's nothing to gain by filtering them by
827             # service_type.
828             self._gateway_services = {ks['uuid']: ks for ks in
829                                       keep_services.execute()['items']}
830             if not self._gateway_services:
831                 raise arvados.errors.NoKeepServersError()
832
833             # Precompute the base URI for each service.
834             for r in self._gateway_services.values():
835                 host = r['service_host']
836                 if not host.startswith('[') and host.find(':') >= 0:
837                     # IPv6 URIs must be formatted like http://[::1]:80/...
838                     host = '[' + host + ']'
839                 r['_service_root'] = "{}://{}:{:d}/".format(
840                     'https' if r['service_ssl_flag'] else 'http',
841                     host,
842                     r['service_port'])
843
844             _logger.debug(str(self._gateway_services))
845             self._keep_services = [
846                 ks for ks in self._gateway_services.values()
847                 if not ks.get('service_type', '').startswith('gateway:')]
848             self._writable_services = [ks for ks in self._keep_services
849                                        if not ks.get('read_only')]
850
851             # For disk type services, max_replicas_per_service is 1
852             # It is unknown (unlimited) for other service types.
853             if self._any_nondisk_services(self._writable_services):
854                 self.max_replicas_per_service = None
855             else:
856                 self.max_replicas_per_service = 1
857
858     def _service_weight(self, data_hash, service_uuid):
859         """Compute the weight of a Keep service endpoint for a data
860         block with a known hash.
861
862         The weight is md5(h + u) where u is the last 15 characters of
863         the service endpoint's UUID.
864         """
865         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
866
867     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
868         """Return an array of Keep service endpoints, in the order in
869         which they should be probed when reading or writing data with
870         the given hash+hints.
871         """
872         self.build_services_list(force_rebuild)
873
874         sorted_roots = []
875         # Use the services indicated by the given +K@... remote
876         # service hints, if any are present and can be resolved to a
877         # URI.
878         for hint in locator.hints:
879             if hint.startswith('K@'):
880                 if len(hint) == 7:
881                     sorted_roots.append(
882                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
883                 elif len(hint) == 29:
884                     svc = self._gateway_services.get(hint[2:])
885                     if svc:
886                         sorted_roots.append(svc['_service_root'])
887
888         # Sort the available local services by weight (heaviest first)
889         # for this locator, and return their service_roots (base URIs)
890         # in that order.
891         use_services = self._keep_services
892         if need_writable:
893             use_services = self._writable_services
894         self.using_proxy = self._any_nondisk_services(use_services)
895         sorted_roots.extend([
896             svc['_service_root'] for svc in sorted(
897                 use_services,
898                 reverse=True,
899                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
900         _logger.debug("{}: {}".format(locator, sorted_roots))
901         return sorted_roots
902
903     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
904         # roots_map is a dictionary, mapping Keep service root strings
905         # to KeepService objects.  Poll for Keep services, and add any
906         # new ones to roots_map.  Return the current list of local
907         # root strings.
908         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
909         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
910         for root in local_roots:
911             if root not in roots_map:
912                 roots_map[root] = self.KeepService(
913                     root, self._user_agent_pool,
914                     upload_counter=self.upload_counter,
915                     download_counter=self.download_counter,
916                     **headers)
917         return local_roots
918
919     @staticmethod
920     def _check_loop_result(result):
921         # KeepClient RetryLoops should save results as a 2-tuple: the
922         # actual result of the request, and the number of servers available
923         # to receive the request this round.
924         # This method returns True if there's a real result, False if
925         # there are no more servers available, otherwise None.
926         if isinstance(result, Exception):
927             return None
928         result, tried_server_count = result
929         if (result is not None) and (result is not False):
930             return True
931         elif tried_server_count < 1:
932             _logger.info("No more Keep services to try; giving up")
933             return False
934         else:
935             return None
936
937     def get_from_cache(self, loc):
938         """Fetch a block only if is in the cache, otherwise return None."""
939         slot = self.block_cache.get(loc)
940         if slot is not None and slot.ready.is_set():
941             return slot.get()
942         else:
943             return None
944
945     @retry.retry_method
946     def head(self, loc_s, num_retries=None):
947         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
948
949     @retry.retry_method
950     def get(self, loc_s, num_retries=None):
951         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
952
953     def _get_or_head(self, loc_s, method="GET", num_retries=None):
954         """Get data from Keep.
955
956         This method fetches one or more blocks of data from Keep.  It
957         sends a request each Keep service registered with the API
958         server (or the proxy provided when this client was
959         instantiated), then each service named in location hints, in
960         sequence.  As soon as one service provides the data, it's
961         returned.
962
963         Arguments:
964         * loc_s: A string of one or more comma-separated locators to fetch.
965           This method returns the concatenation of these blocks.
966         * num_retries: The number of times to retry GET requests to
967           *each* Keep server if it returns temporary failures, with
968           exponential backoff.  Note that, in each loop, the method may try
969           to fetch data from every available Keep service, along with any
970           that are named in location hints in the locator.  The default value
971           is set when the KeepClient is initialized.
972         """
973         if ',' in loc_s:
974             return ''.join(self.get(x) for x in loc_s.split(','))
975
976         self.get_counter.add(1)
977
978         locator = KeepLocator(loc_s)
979         if method == "GET":
980             slot, first = self.block_cache.reserve_cache(locator.md5sum)
981             if not first:
982                 self.hits_counter.add(1)
983                 v = slot.get()
984                 return v
985
986         self.misses_counter.add(1)
987
988         # If the locator has hints specifying a prefix (indicating a
989         # remote keepproxy) or the UUID of a local gateway service,
990         # read data from the indicated service(s) instead of the usual
991         # list of local disk services.
992         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
993                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
994         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
995                            for hint in locator.hints if (
996                                    hint.startswith('K@') and
997                                    len(hint) == 29 and
998                                    self._gateway_services.get(hint[2:])
999                                    )])
1000         # Map root URLs to their KeepService objects.
1001         roots_map = {
1002             root: self.KeepService(root, self._user_agent_pool,
1003                                    upload_counter=self.upload_counter,
1004                                    download_counter=self.download_counter)
1005             for root in hint_roots
1006         }
1007
1008         # See #3147 for a discussion of the loop implementation.  Highlights:
1009         # * Refresh the list of Keep services after each failure, in case
1010         #   it's being updated.
1011         # * Retry until we succeed, we're out of retries, or every available
1012         #   service has returned permanent failure.
1013         sorted_roots = []
1014         roots_map = {}
1015         blob = None
1016         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1017                                backoff_start=2)
1018         for tries_left in loop:
1019             try:
1020                 sorted_roots = self.map_new_services(
1021                     roots_map, locator,
1022                     force_rebuild=(tries_left < num_retries),
1023                     need_writable=False)
1024             except Exception as error:
1025                 loop.save_result(error)
1026                 continue
1027
1028             # Query KeepService objects that haven't returned
1029             # permanent failure, in our specified shuffle order.
1030             services_to_try = [roots_map[root]
1031                                for root in sorted_roots
1032                                if roots_map[root].usable()]
1033             for keep_service in services_to_try:
1034                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1035                 if blob is not None:
1036                     break
1037             loop.save_result((blob, len(services_to_try)))
1038
1039         # Always cache the result, then return it if we succeeded.
1040         if method == "GET":
1041             slot.set(blob)
1042             self.block_cache.cap_cache()
1043         if loop.success():
1044             if method == "HEAD":
1045                 return True
1046             else:
1047                 return blob
1048
1049         # Q: Including 403 is necessary for the Keep tests to continue
1050         # passing, but maybe they should expect KeepReadError instead?
1051         not_founds = sum(1 for key in sorted_roots
1052                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1053         service_errors = ((key, roots_map[key].last_result()['error'])
1054                           for key in sorted_roots)
1055         if not roots_map:
1056             raise arvados.errors.KeepReadError(
1057                 "failed to read {}: no Keep services available ({})".format(
1058                     loc_s, loop.last_result()))
1059         elif not_founds == len(sorted_roots):
1060             raise arvados.errors.NotFoundError(
1061                 "{} not found".format(loc_s), service_errors)
1062         else:
1063             raise arvados.errors.KeepReadError(
1064                 "failed to read {}".format(loc_s), service_errors, label="service")
1065
1066     @retry.retry_method
1067     def put(self, data, copies=2, num_retries=None):
1068         """Save data in Keep.
1069
1070         This method will get a list of Keep services from the API server, and
1071         send the data to each one simultaneously in a new thread.  Once the
1072         uploads are finished, if enough copies are saved, this method returns
1073         the most recent HTTP response body.  If requests fail to upload
1074         enough copies, this method raises KeepWriteError.
1075
1076         Arguments:
1077         * data: The string of data to upload.
1078         * copies: The number of copies that the user requires be saved.
1079           Default 2.
1080         * num_retries: The number of times to retry PUT requests to
1081           *each* Keep server if it returns temporary failures, with
1082           exponential backoff.  The default value is set when the
1083           KeepClient is initialized.
1084         """
1085
1086         if not isinstance(data, bytes):
1087             data = data.encode()
1088
1089         self.put_counter.add(1)
1090
1091         data_hash = hashlib.md5(data).hexdigest()
1092         loc_s = data_hash + '+' + str(len(data))
1093         if copies < 1:
1094             return loc_s
1095         locator = KeepLocator(loc_s)
1096
1097         headers = {}
1098         # Tell the proxy how many copies we want it to store
1099         headers['X-Keep-Desired-Replicas'] = str(copies)
1100         roots_map = {}
1101         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1102                                backoff_start=2)
1103         done = 0
1104         for tries_left in loop:
1105             try:
1106                 sorted_roots = self.map_new_services(
1107                     roots_map, locator,
1108                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1109             except Exception as error:
1110                 loop.save_result(error)
1111                 continue
1112
1113             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1114                                                         data_hash=data_hash,
1115                                                         copies=copies - done,
1116                                                         max_service_replicas=self.max_replicas_per_service,
1117                                                         timeout=self.current_timeout(num_retries - tries_left))
1118             for service_root, ks in [(root, roots_map[root])
1119                                      for root in sorted_roots]:
1120                 if ks.finished():
1121                     continue
1122                 writer_pool.add_task(ks, service_root)
1123             writer_pool.join()
1124             done += writer_pool.done()
1125             loop.save_result((done >= copies, writer_pool.total_task_nr))
1126
1127         if loop.success():
1128             return writer_pool.response()
1129         if not roots_map:
1130             raise arvados.errors.KeepWriteError(
1131                 "failed to write {}: no Keep services available ({})".format(
1132                     data_hash, loop.last_result()))
1133         else:
1134             service_errors = ((key, roots_map[key].last_result()['error'])
1135                               for key in sorted_roots
1136                               if roots_map[key].last_result()['error'])
1137             raise arvados.errors.KeepWriteError(
1138                 "failed to write {} (wanted {} copies but wrote {})".format(
1139                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1140
1141     def local_store_put(self, data, copies=1, num_retries=None):
1142         """A stub for put().
1143
1144         This method is used in place of the real put() method when
1145         using local storage (see constructor's local_store argument).
1146
1147         copies and num_retries arguments are ignored: they are here
1148         only for the sake of offering the same call signature as
1149         put().
1150
1151         Data stored this way can be retrieved via local_store_get().
1152         """
1153         md5 = hashlib.md5(data).hexdigest()
1154         locator = '%s+%d' % (md5, len(data))
1155         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1156             f.write(data)
1157         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1158                   os.path.join(self.local_store, md5))
1159         return locator
1160
1161     def local_store_get(self, loc_s, num_retries=None):
1162         """Companion to local_store_put()."""
1163         try:
1164             locator = KeepLocator(loc_s)
1165         except ValueError:
1166             raise arvados.errors.NotFoundError(
1167                 "Invalid data locator: '%s'" % loc_s)
1168         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1169             return b''
1170         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1171             return f.read()
1172
1173     def is_cached(self, locator):
1174         return self.block_cache.reserve_cache(expect_hash)