11308: Futurize stage2.
[arvados.git] / sdk / python / arvados / keep.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import next
6 from builtins import str
7 from builtins import range
8 from past.utils import old_div
9 from builtins import object
10 import io
11 import datetime
12 import hashlib
13 import logging
14 import math
15 import os
16 import pycurl
17 import queue
18 import re
19 import socket
20 import ssl
21 import sys
22 import threading
23 from . import timer
24 import urllib.parse
25
26 import arvados
27 import arvados.config as config
28 import arvados.errors
29 import arvados.retry as retry
30 import arvados.util
31
32 _logger = logging.getLogger('arvados.keep')
33 global_client_object = None
34
35
36 # Monkey patch TCP constants when not available (apple). Values sourced from:
37 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
38 if sys.platform == 'darwin':
39     if not hasattr(socket, 'TCP_KEEPALIVE'):
40         socket.TCP_KEEPALIVE = 0x010
41     if not hasattr(socket, 'TCP_KEEPINTVL'):
42         socket.TCP_KEEPINTVL = 0x101
43     if not hasattr(socket, 'TCP_KEEPCNT'):
44         socket.TCP_KEEPCNT = 0x102
45
46
47 class KeepLocator(object):
48     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
49     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
50
51     def __init__(self, locator_str):
52         self.hints = []
53         self._perm_sig = None
54         self._perm_expiry = None
55         pieces = iter(locator_str.split('+'))
56         self.md5sum = next(pieces)
57         try:
58             self.size = int(next(pieces))
59         except StopIteration:
60             self.size = None
61         for hint in pieces:
62             if self.HINT_RE.match(hint) is None:
63                 raise ValueError("invalid hint format: {}".format(hint))
64             elif hint.startswith('A'):
65                 self.parse_permission_hint(hint)
66             else:
67                 self.hints.append(hint)
68
69     def __str__(self):
70         return '+'.join(
71             str(s) for s in [self.md5sum, self.size,
72                              self.permission_hint()] + self.hints
73             if s is not None)
74
75     def stripped(self):
76         if self.size is not None:
77             return "%s+%i" % (self.md5sum, self.size)
78         else:
79             return self.md5sum
80
81     def _make_hex_prop(name, length):
82         # Build and return a new property with the given name that
83         # must be a hex string of the given length.
84         data_name = '_{}'.format(name)
85         def getter(self):
86             return getattr(self, data_name)
87         def setter(self, hex_str):
88             if not arvados.util.is_hex(hex_str, length):
89                 raise ValueError("{} is not a {}-digit hex string: {}".
90                                  format(name, length, hex_str))
91             setattr(self, data_name, hex_str)
92         return property(getter, setter)
93
94     md5sum = _make_hex_prop('md5sum', 32)
95     perm_sig = _make_hex_prop('perm_sig', 40)
96
97     @property
98     def perm_expiry(self):
99         return self._perm_expiry
100
101     @perm_expiry.setter
102     def perm_expiry(self, value):
103         if not arvados.util.is_hex(value, 1, 8):
104             raise ValueError(
105                 "permission timestamp must be a hex Unix timestamp: {}".
106                 format(value))
107         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
108
109     def permission_hint(self):
110         data = [self.perm_sig, self.perm_expiry]
111         if None in data:
112             return None
113         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
114         return "A{}@{:08x}".format(*data)
115
116     def parse_permission_hint(self, s):
117         try:
118             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
119         except IndexError:
120             raise ValueError("bad permission hint {}".format(s))
121
122     def permission_expired(self, as_of_dt=None):
123         if self.perm_expiry is None:
124             return False
125         elif as_of_dt is None:
126             as_of_dt = datetime.datetime.now()
127         return self.perm_expiry <= as_of_dt
128
129
130 class Keep(object):
131     """Simple interface to a global KeepClient object.
132
133     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
134     own API client.  The global KeepClient will build an API client from the
135     current Arvados configuration, which may not match the one you built.
136     """
137     _last_key = None
138
139     @classmethod
140     def global_client_object(cls):
141         global global_client_object
142         # Previously, KeepClient would change its behavior at runtime based
143         # on these configuration settings.  We simulate that behavior here
144         # by checking the values and returning a new KeepClient if any of
145         # them have changed.
146         key = (config.get('ARVADOS_API_HOST'),
147                config.get('ARVADOS_API_TOKEN'),
148                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
149                config.get('ARVADOS_KEEP_PROXY'),
150                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
151                os.environ.get('KEEP_LOCAL_STORE'))
152         if (global_client_object is None) or (cls._last_key != key):
153             global_client_object = KeepClient()
154             cls._last_key = key
155         return global_client_object
156
157     @staticmethod
158     def get(locator, **kwargs):
159         return Keep.global_client_object().get(locator, **kwargs)
160
161     @staticmethod
162     def put(data, **kwargs):
163         return Keep.global_client_object().put(data, **kwargs)
164
165 class KeepBlockCache(object):
166     # Default RAM cache is 256MiB
167     def __init__(self, cache_max=(256 * 1024 * 1024)):
168         self.cache_max = cache_max
169         self._cache = []
170         self._cache_lock = threading.Lock()
171
172     class CacheSlot(object):
173         __slots__ = ("locator", "ready", "content")
174
175         def __init__(self, locator):
176             self.locator = locator
177             self.ready = threading.Event()
178             self.content = None
179
180         def get(self):
181             self.ready.wait()
182             return self.content
183
184         def set(self, value):
185             self.content = value
186             self.ready.set()
187
188         def size(self):
189             if self.content is None:
190                 return 0
191             else:
192                 return len(self.content)
193
194     def cap_cache(self):
195         '''Cap the cache size to self.cache_max'''
196         with self._cache_lock:
197             # Select all slots except those where ready.is_set() and content is
198             # None (that means there was an error reading the block).
199             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
200             sm = sum([slot.size() for slot in self._cache])
201             while len(self._cache) > 0 and sm > self.cache_max:
202                 for i in range(len(self._cache)-1, -1, -1):
203                     if self._cache[i].ready.is_set():
204                         del self._cache[i]
205                         break
206                 sm = sum([slot.size() for slot in self._cache])
207
208     def _get(self, locator):
209         # Test if the locator is already in the cache
210         for i in range(0, len(self._cache)):
211             if self._cache[i].locator == locator:
212                 n = self._cache[i]
213                 if i != 0:
214                     # move it to the front
215                     del self._cache[i]
216                     self._cache.insert(0, n)
217                 return n
218         return None
219
220     def get(self, locator):
221         with self._cache_lock:
222             return self._get(locator)
223
224     def reserve_cache(self, locator):
225         '''Reserve a cache slot for the specified locator,
226         or return the existing slot.'''
227         with self._cache_lock:
228             n = self._get(locator)
229             if n:
230                 return n, False
231             else:
232                 # Add a new cache slot for the locator
233                 n = KeepBlockCache.CacheSlot(locator)
234                 self._cache.insert(0, n)
235                 return n, True
236
237 class Counter(object):
238     def __init__(self, v=0):
239         self._lk = threading.Lock()
240         self._val = v
241
242     def add(self, v):
243         with self._lk:
244             self._val += v
245
246     def get(self):
247         with self._lk:
248             return self._val
249
250
251 class KeepClient(object):
252
253     # Default Keep server connection timeout:  2 seconds
254     # Default Keep server read timeout:       256 seconds
255     # Default Keep server bandwidth minimum:  32768 bytes per second
256     # Default Keep proxy connection timeout:  20 seconds
257     # Default Keep proxy read timeout:        256 seconds
258     # Default Keep proxy bandwidth minimum:   32768 bytes per second
259     DEFAULT_TIMEOUT = (2, 256, 32768)
260     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
261
262
263     class KeepService(object):
264         """Make requests to a single Keep service, and track results.
265
266         A KeepService is intended to last long enough to perform one
267         transaction (GET or PUT) against one Keep service. This can
268         involve calling either get() or put() multiple times in order
269         to retry after transient failures. However, calling both get()
270         and put() on a single instance -- or using the same instance
271         to access two different Keep services -- will not produce
272         sensible behavior.
273         """
274
275         HTTP_ERRORS = (
276             socket.error,
277             ssl.SSLError,
278             arvados.errors.HttpError,
279         )
280
281         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
282                      upload_counter=None,
283                      download_counter=None, **headers):
284             self.root = root
285             self._user_agent_pool = user_agent_pool
286             self._result = {'error': None}
287             self._usable = True
288             self._session = None
289             self.get_headers = {'Accept': 'application/octet-stream'}
290             self.get_headers.update(headers)
291             self.put_headers = headers
292             self.upload_counter = upload_counter
293             self.download_counter = download_counter
294
295         def usable(self):
296             """Is it worth attempting a request?"""
297             return self._usable
298
299         def finished(self):
300             """Did the request succeed or encounter permanent failure?"""
301             return self._result['error'] == False or not self._usable
302
303         def last_result(self):
304             return self._result
305
306         def _get_user_agent(self):
307             try:
308                 return self._user_agent_pool.get(block=False)
309             except queue.Empty:
310                 return pycurl.Curl()
311
312         def _put_user_agent(self, ua):
313             try:
314                 ua.reset()
315                 self._user_agent_pool.put(ua, block=False)
316             except:
317                 ua.close()
318
319         @staticmethod
320         def _socket_open(family, socktype, protocol, address=None):
321             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
322             s = socket.socket(family, socktype, protocol)
323             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
324             # Will throw invalid protocol error on mac. This test prevents that.
325             if hasattr(socket, 'TCP_KEEPIDLE'):
326                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
327             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
328             return s
329
330         def get(self, locator, method="GET", timeout=None):
331             # locator is a KeepLocator object.
332             url = self.root + str(locator)
333             _logger.debug("Request: %s %s", method, url)
334             curl = self._get_user_agent()
335             ok = None
336             try:
337                 with timer.Timer() as t:
338                     self._headers = {}
339                     response_body = io.StringIO()
340                     curl.setopt(pycurl.NOSIGNAL, 1)
341                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
342                     curl.setopt(pycurl.URL, url.encode('utf-8'))
343                     curl.setopt(pycurl.HTTPHEADER, [
344                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
345                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
346                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
347                     if method == "HEAD":
348                         curl.setopt(pycurl.NOBODY, True)
349                     self._setcurltimeouts(curl, timeout)
350
351                     try:
352                         curl.perform()
353                     except Exception as e:
354                         raise arvados.errors.HttpError(0, str(e))
355                     self._result = {
356                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
357                         'body': response_body.getvalue(),
358                         'headers': self._headers,
359                         'error': False,
360                     }
361
362                 ok = retry.check_http_response_success(self._result['status_code'])
363                 if not ok:
364                     self._result['error'] = arvados.errors.HttpError(
365                         self._result['status_code'],
366                         self._headers.get('x-status-line', 'Error'))
367             except self.HTTP_ERRORS as e:
368                 self._result = {
369                     'error': e,
370                 }
371             self._usable = ok != False
372             if self._result.get('status_code', None):
373                 # The client worked well enough to get an HTTP status
374                 # code, so presumably any problems are just on the
375                 # server side and it's OK to reuse the client.
376                 self._put_user_agent(curl)
377             else:
378                 # Don't return this client to the pool, in case it's
379                 # broken.
380                 curl.close()
381             if not ok:
382                 _logger.debug("Request fail: GET %s => %s: %s",
383                               url, type(self._result['error']), str(self._result['error']))
384                 return None
385             if method == "HEAD":
386                 _logger.info("HEAD %s: %s bytes",
387                          self._result['status_code'],
388                          self._result.get('content-length'))
389                 return True
390
391             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
392                          self._result['status_code'],
393                          len(self._result['body']),
394                          t.msecs,
395                          old_div((old_div(len(self._result['body']),(1024.0*1024))),t.secs) if t.secs > 0 else 0)
396
397             if self.download_counter:
398                 self.download_counter.add(len(self._result['body']))
399             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
400             if resp_md5 != locator.md5sum:
401                 _logger.warning("Checksum fail: md5(%s) = %s",
402                                 url, resp_md5)
403                 self._result['error'] = arvados.errors.HttpError(
404                     0, 'Checksum fail')
405                 return None
406             return self._result['body']
407
408         def put(self, hash_s, body, timeout=None):
409             url = self.root + hash_s
410             _logger.debug("Request: PUT %s", url)
411             curl = self._get_user_agent()
412             ok = None
413             try:
414                 with timer.Timer() as t:
415                     self._headers = {}
416                     body_reader = io.StringIO(body)
417                     response_body = io.StringIO()
418                     curl.setopt(pycurl.NOSIGNAL, 1)
419                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
420                     curl.setopt(pycurl.URL, url.encode('utf-8'))
421                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
422                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
423                     # response) instead of sending the request body immediately.
424                     # This allows the server to reject the request if the request
425                     # is invalid or the server is read-only, without waiting for
426                     # the client to send the entire block.
427                     curl.setopt(pycurl.UPLOAD, True)
428                     curl.setopt(pycurl.INFILESIZE, len(body))
429                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
430                     curl.setopt(pycurl.HTTPHEADER, [
431                         '{}: {}'.format(k,v) for k,v in self.put_headers.items()])
432                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
433                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
434                     self._setcurltimeouts(curl, timeout)
435                     try:
436                         curl.perform()
437                     except Exception as e:
438                         raise arvados.errors.HttpError(0, str(e))
439                     self._result = {
440                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
441                         'body': response_body.getvalue(),
442                         'headers': self._headers,
443                         'error': False,
444                     }
445                 ok = retry.check_http_response_success(self._result['status_code'])
446                 if not ok:
447                     self._result['error'] = arvados.errors.HttpError(
448                         self._result['status_code'],
449                         self._headers.get('x-status-line', 'Error'))
450             except self.HTTP_ERRORS as e:
451                 self._result = {
452                     'error': e,
453                 }
454             self._usable = ok != False # still usable if ok is True or None
455             if self._result.get('status_code', None):
456                 # Client is functional. See comment in get().
457                 self._put_user_agent(curl)
458             else:
459                 curl.close()
460             if not ok:
461                 _logger.debug("Request fail: PUT %s => %s: %s",
462                               url, type(self._result['error']), str(self._result['error']))
463                 return False
464             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
465                          self._result['status_code'],
466                          len(body),
467                          t.msecs,
468                          old_div((old_div(len(body),(1024.0*1024))),t.secs) if t.secs > 0 else 0)
469             if self.upload_counter:
470                 self.upload_counter.add(len(body))
471             return True
472
473         def _setcurltimeouts(self, curl, timeouts):
474             if not timeouts:
475                 return
476             elif isinstance(timeouts, tuple):
477                 if len(timeouts) == 2:
478                     conn_t, xfer_t = timeouts
479                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
480                 else:
481                     conn_t, xfer_t, bandwidth_bps = timeouts
482             else:
483                 conn_t, xfer_t = (timeouts, timeouts)
484                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
485             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
486             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
487             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
488
489         def _headerfunction(self, header_line):
490             header_line = header_line.decode('iso-8859-1')
491             if ':' in header_line:
492                 name, value = header_line.split(':', 1)
493                 name = name.strip().lower()
494                 value = value.strip()
495             elif self._headers:
496                 name = self._lastheadername
497                 value = self._headers[name] + ' ' + header_line.strip()
498             elif header_line.startswith('HTTP/'):
499                 name = 'x-status-line'
500                 value = header_line
501             else:
502                 _logger.error("Unexpected header line: %s", header_line)
503                 return
504             self._lastheadername = name
505             self._headers[name] = value
506             # Returning None implies all bytes were written
507     
508
509     class KeepWriterQueue(queue.Queue):
510         def __init__(self, copies):
511             queue.Queue.__init__(self) # Old-style superclass
512             self.wanted_copies = copies
513             self.successful_copies = 0
514             self.response = None
515             self.successful_copies_lock = threading.Lock()
516             self.pending_tries = copies
517             self.pending_tries_notification = threading.Condition()
518         
519         def write_success(self, response, replicas_nr):
520             with self.successful_copies_lock:
521                 self.successful_copies += replicas_nr
522                 self.response = response
523             with self.pending_tries_notification:
524                 self.pending_tries_notification.notify_all()
525         
526         def write_fail(self, ks):
527             with self.pending_tries_notification:
528                 self.pending_tries += 1
529                 self.pending_tries_notification.notify()
530         
531         def pending_copies(self):
532             with self.successful_copies_lock:
533                 return self.wanted_copies - self.successful_copies
534
535         def get_next_task(self):
536             with self.pending_tries_notification:
537                 while True:
538                     if self.pending_copies() < 1:
539                         # This notify_all() is unnecessary --
540                         # write_success() already called notify_all()
541                         # when pending<1 became true, so it's not
542                         # possible for any other thread to be in
543                         # wait() now -- but it's cheap insurance
544                         # against deadlock so we do it anyway:
545                         self.pending_tries_notification.notify_all()
546                         # Drain the queue and then raise Queue.Empty
547                         while True:
548                             self.get_nowait()
549                             self.task_done()
550                     elif self.pending_tries > 0:
551                         service, service_root = self.get_nowait()
552                         if service.finished():
553                             self.task_done()
554                             continue
555                         self.pending_tries -= 1
556                         return service, service_root
557                     elif self.empty():
558                         self.pending_tries_notification.notify_all()
559                         raise queue.Empty
560                     else:
561                         self.pending_tries_notification.wait()
562
563
564     class KeepWriterThreadPool(object):
565         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
566             self.total_task_nr = 0
567             self.wanted_copies = copies
568             if (not max_service_replicas) or (max_service_replicas >= copies):
569                 num_threads = 1
570             else:
571                 num_threads = int(math.ceil(old_div(float(copies), max_service_replicas)))
572             _logger.debug("Pool max threads is %d", num_threads)
573             self.workers = []
574             self.queue = KeepClient.KeepWriterQueue(copies)
575             # Create workers
576             for _ in range(num_threads):
577                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
578                 self.workers.append(w)
579         
580         def add_task(self, ks, service_root):
581             self.queue.put((ks, service_root))
582             self.total_task_nr += 1
583         
584         def done(self):
585             return self.queue.successful_copies
586         
587         def join(self):
588             # Start workers
589             for worker in self.workers:
590                 worker.start()
591             # Wait for finished work
592             self.queue.join()
593         
594         def response(self):
595             return self.queue.response
596     
597     
598     class KeepWriterThread(threading.Thread):
599         TaskFailed = RuntimeError()
600
601         def __init__(self, queue, data, data_hash, timeout=None):
602             super(KeepClient.KeepWriterThread, self).__init__()
603             self.timeout = timeout
604             self.queue = queue
605             self.data = data
606             self.data_hash = data_hash
607             self.daemon = True
608
609         def run(self):
610             while True:
611                 try:
612                     service, service_root = self.queue.get_next_task()
613                 except queue.Empty:
614                     return
615                 try:
616                     locator, copies = self.do_task(service, service_root)
617                 except Exception as e:
618                     if e is not self.TaskFailed:
619                         _logger.exception("Exception in KeepWriterThread")
620                     self.queue.write_fail(service)
621                 else:
622                     self.queue.write_success(locator, copies)
623                 finally:
624                     self.queue.task_done()
625
626         def do_task(self, service, service_root):
627             success = bool(service.put(self.data_hash,
628                                         self.data,
629                                         timeout=self.timeout))
630             result = service.last_result()
631
632             if not success:
633                 if result.get('status_code', None):
634                     _logger.debug("Request fail: PUT %s => %s %s",
635                                   self.data_hash,
636                                   result['status_code'],
637                                   result['body'])
638                 raise self.TaskFailed
639
640             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
641                           str(threading.current_thread()),
642                           self.data_hash,
643                           len(self.data),
644                           service_root)
645             try:
646                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
647             except (KeyError, ValueError):
648                 replicas_stored = 1
649
650             return result['body'].strip(), replicas_stored
651
652
653     def __init__(self, api_client=None, proxy=None,
654                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
655                  api_token=None, local_store=None, block_cache=None,
656                  num_retries=0, session=None):
657         """Initialize a new KeepClient.
658
659         Arguments:
660         :api_client:
661           The API client to use to find Keep services.  If not
662           provided, KeepClient will build one from available Arvados
663           configuration.
664
665         :proxy:
666           If specified, this KeepClient will send requests to this Keep
667           proxy.  Otherwise, KeepClient will fall back to the setting of the
668           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
669           If you want to KeepClient does not use a proxy, pass in an empty
670           string.
671
672         :timeout:
673           The initial timeout (in seconds) for HTTP requests to Keep
674           non-proxy servers.  A tuple of three floats is interpreted as
675           (connection_timeout, read_timeout, minimum_bandwidth). A connection
676           will be aborted if the average traffic rate falls below
677           minimum_bandwidth bytes per second over an interval of read_timeout
678           seconds. Because timeouts are often a result of transient server
679           load, the actual connection timeout will be increased by a factor
680           of two on each retry.
681           Default: (2, 256, 32768).
682
683         :proxy_timeout:
684           The initial timeout (in seconds) for HTTP requests to
685           Keep proxies. A tuple of three floats is interpreted as
686           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
687           described above for adjusting connection timeouts on retry also
688           applies.
689           Default: (20, 256, 32768).
690
691         :api_token:
692           If you're not using an API client, but only talking
693           directly to a Keep proxy, this parameter specifies an API token
694           to authenticate Keep requests.  It is an error to specify both
695           api_client and api_token.  If you specify neither, KeepClient
696           will use one available from the Arvados configuration.
697
698         :local_store:
699           If specified, this KeepClient will bypass Keep
700           services, and save data to the named directory.  If unspecified,
701           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
702           environment variable.  If you want to ensure KeepClient does not
703           use local storage, pass in an empty string.  This is primarily
704           intended to mock a server for testing.
705
706         :num_retries:
707           The default number of times to retry failed requests.
708           This will be used as the default num_retries value when get() and
709           put() are called.  Default 0.
710         """
711         self.lock = threading.Lock()
712         if proxy is None:
713             if config.get('ARVADOS_KEEP_SERVICES'):
714                 proxy = config.get('ARVADOS_KEEP_SERVICES')
715             else:
716                 proxy = config.get('ARVADOS_KEEP_PROXY')
717         if api_token is None:
718             if api_client is None:
719                 api_token = config.get('ARVADOS_API_TOKEN')
720             else:
721                 api_token = api_client.api_token
722         elif api_client is not None:
723             raise ValueError(
724                 "can't build KeepClient with both API client and token")
725         if local_store is None:
726             local_store = os.environ.get('KEEP_LOCAL_STORE')
727
728         self.block_cache = block_cache if block_cache else KeepBlockCache()
729         self.timeout = timeout
730         self.proxy_timeout = proxy_timeout
731         self._user_agent_pool = queue.LifoQueue()
732         self.upload_counter = Counter()
733         self.download_counter = Counter()
734         self.put_counter = Counter()
735         self.get_counter = Counter()
736         self.hits_counter = Counter()
737         self.misses_counter = Counter()
738
739         if local_store:
740             self.local_store = local_store
741             self.get = self.local_store_get
742             self.put = self.local_store_put
743         else:
744             self.num_retries = num_retries
745             self.max_replicas_per_service = None
746             if proxy:
747                 proxy_uris = proxy.split()
748                 for i in range(len(proxy_uris)):
749                     if not proxy_uris[i].endswith('/'):
750                         proxy_uris[i] += '/'
751                     # URL validation
752                     url = urllib.parse.urlparse(proxy_uris[i])
753                     if not (url.scheme and url.netloc):
754                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
755                 self.api_token = api_token
756                 self._gateway_services = {}
757                 self._keep_services = [{
758                     'uuid': "00000-bi6l4-%015d" % idx,
759                     'service_type': 'proxy',
760                     '_service_root': uri,
761                     } for idx, uri in enumerate(proxy_uris)]
762                 self._writable_services = self._keep_services
763                 self.using_proxy = True
764                 self._static_services_list = True
765             else:
766                 # It's important to avoid instantiating an API client
767                 # unless we actually need one, for testing's sake.
768                 if api_client is None:
769                     api_client = arvados.api('v1')
770                 self.api_client = api_client
771                 self.api_token = api_client.api_token
772                 self._gateway_services = {}
773                 self._keep_services = None
774                 self._writable_services = None
775                 self.using_proxy = None
776                 self._static_services_list = False
777
778     def current_timeout(self, attempt_number):
779         """Return the appropriate timeout to use for this client.
780
781         The proxy timeout setting if the backend service is currently a proxy,
782         the regular timeout setting otherwise.  The `attempt_number` indicates
783         how many times the operation has been tried already (starting from 0
784         for the first try), and scales the connection timeout portion of the
785         return value accordingly.
786
787         """
788         # TODO(twp): the timeout should be a property of a
789         # KeepService, not a KeepClient. See #4488.
790         t = self.proxy_timeout if self.using_proxy else self.timeout
791         if len(t) == 2:
792             return (t[0] * (1 << attempt_number), t[1])
793         else:
794             return (t[0] * (1 << attempt_number), t[1], t[2])
795     def _any_nondisk_services(self, service_list):
796         return any(ks.get('service_type', 'disk') != 'disk'
797                    for ks in service_list)
798
799     def build_services_list(self, force_rebuild=False):
800         if (self._static_services_list or
801               (self._keep_services and not force_rebuild)):
802             return
803         with self.lock:
804             try:
805                 keep_services = self.api_client.keep_services().accessible()
806             except Exception:  # API server predates Keep services.
807                 keep_services = self.api_client.keep_disks().list()
808
809             # Gateway services are only used when specified by UUID,
810             # so there's nothing to gain by filtering them by
811             # service_type.
812             self._gateway_services = {ks['uuid']: ks for ks in
813                                       keep_services.execute()['items']}
814             if not self._gateway_services:
815                 raise arvados.errors.NoKeepServersError()
816
817             # Precompute the base URI for each service.
818             for r in self._gateway_services.values():
819                 host = r['service_host']
820                 if not host.startswith('[') and host.find(':') >= 0:
821                     # IPv6 URIs must be formatted like http://[::1]:80/...
822                     host = '[' + host + ']'
823                 r['_service_root'] = "{}://{}:{:d}/".format(
824                     'https' if r['service_ssl_flag'] else 'http',
825                     host,
826                     r['service_port'])
827
828             _logger.debug(str(self._gateway_services))
829             self._keep_services = [
830                 ks for ks in self._gateway_services.values()
831                 if not ks.get('service_type', '').startswith('gateway:')]
832             self._writable_services = [ks for ks in self._keep_services
833                                        if not ks.get('read_only')]
834
835             # For disk type services, max_replicas_per_service is 1
836             # It is unknown (unlimited) for other service types.
837             if self._any_nondisk_services(self._writable_services):
838                 self.max_replicas_per_service = None
839             else:
840                 self.max_replicas_per_service = 1
841
842     def _service_weight(self, data_hash, service_uuid):
843         """Compute the weight of a Keep service endpoint for a data
844         block with a known hash.
845
846         The weight is md5(h + u) where u is the last 15 characters of
847         the service endpoint's UUID.
848         """
849         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
850
851     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
852         """Return an array of Keep service endpoints, in the order in
853         which they should be probed when reading or writing data with
854         the given hash+hints.
855         """
856         self.build_services_list(force_rebuild)
857
858         sorted_roots = []
859         # Use the services indicated by the given +K@... remote
860         # service hints, if any are present and can be resolved to a
861         # URI.
862         for hint in locator.hints:
863             if hint.startswith('K@'):
864                 if len(hint) == 7:
865                     sorted_roots.append(
866                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
867                 elif len(hint) == 29:
868                     svc = self._gateway_services.get(hint[2:])
869                     if svc:
870                         sorted_roots.append(svc['_service_root'])
871
872         # Sort the available local services by weight (heaviest first)
873         # for this locator, and return their service_roots (base URIs)
874         # in that order.
875         use_services = self._keep_services
876         if need_writable:
877             use_services = self._writable_services
878         self.using_proxy = self._any_nondisk_services(use_services)
879         sorted_roots.extend([
880             svc['_service_root'] for svc in sorted(
881                 use_services,
882                 reverse=True,
883                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
884         _logger.debug("{}: {}".format(locator, sorted_roots))
885         return sorted_roots
886
887     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
888         # roots_map is a dictionary, mapping Keep service root strings
889         # to KeepService objects.  Poll for Keep services, and add any
890         # new ones to roots_map.  Return the current list of local
891         # root strings.
892         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
893         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
894         for root in local_roots:
895             if root not in roots_map:
896                 roots_map[root] = self.KeepService(
897                     root, self._user_agent_pool,
898                     upload_counter=self.upload_counter,
899                     download_counter=self.download_counter,
900                     **headers)
901         return local_roots
902
903     @staticmethod
904     def _check_loop_result(result):
905         # KeepClient RetryLoops should save results as a 2-tuple: the
906         # actual result of the request, and the number of servers available
907         # to receive the request this round.
908         # This method returns True if there's a real result, False if
909         # there are no more servers available, otherwise None.
910         if isinstance(result, Exception):
911             return None
912         result, tried_server_count = result
913         if (result is not None) and (result is not False):
914             return True
915         elif tried_server_count < 1:
916             _logger.info("No more Keep services to try; giving up")
917             return False
918         else:
919             return None
920
921     def get_from_cache(self, loc):
922         """Fetch a block only if is in the cache, otherwise return None."""
923         slot = self.block_cache.get(loc)
924         if slot is not None and slot.ready.is_set():
925             return slot.get()
926         else:
927             return None
928
929     @retry.retry_method
930     def head(self, loc_s, num_retries=None):
931         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
932
933     @retry.retry_method
934     def get(self, loc_s, num_retries=None):
935         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
936
937     def _get_or_head(self, loc_s, method="GET", num_retries=None):
938         """Get data from Keep.
939
940         This method fetches one or more blocks of data from Keep.  It
941         sends a request each Keep service registered with the API
942         server (or the proxy provided when this client was
943         instantiated), then each service named in location hints, in
944         sequence.  As soon as one service provides the data, it's
945         returned.
946
947         Arguments:
948         * loc_s: A string of one or more comma-separated locators to fetch.
949           This method returns the concatenation of these blocks.
950         * num_retries: The number of times to retry GET requests to
951           *each* Keep server if it returns temporary failures, with
952           exponential backoff.  Note that, in each loop, the method may try
953           to fetch data from every available Keep service, along with any
954           that are named in location hints in the locator.  The default value
955           is set when the KeepClient is initialized.
956         """
957         if ',' in loc_s:
958             return ''.join(self.get(x) for x in loc_s.split(','))
959
960         self.get_counter.add(1)
961
962         locator = KeepLocator(loc_s)
963         if method == "GET":
964             slot, first = self.block_cache.reserve_cache(locator.md5sum)
965             if not first:
966                 self.hits_counter.add(1)
967                 v = slot.get()
968                 return v
969
970         self.misses_counter.add(1)
971
972         # If the locator has hints specifying a prefix (indicating a
973         # remote keepproxy) or the UUID of a local gateway service,
974         # read data from the indicated service(s) instead of the usual
975         # list of local disk services.
976         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
977                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
978         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
979                            for hint in locator.hints if (
980                                    hint.startswith('K@') and
981                                    len(hint) == 29 and
982                                    self._gateway_services.get(hint[2:])
983                                    )])
984         # Map root URLs to their KeepService objects.
985         roots_map = {
986             root: self.KeepService(root, self._user_agent_pool,
987                                    upload_counter=self.upload_counter,
988                                    download_counter=self.download_counter)
989             for root in hint_roots
990         }
991
992         # See #3147 for a discussion of the loop implementation.  Highlights:
993         # * Refresh the list of Keep services after each failure, in case
994         #   it's being updated.
995         # * Retry until we succeed, we're out of retries, or every available
996         #   service has returned permanent failure.
997         sorted_roots = []
998         roots_map = {}
999         blob = None
1000         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1001                                backoff_start=2)
1002         for tries_left in loop:
1003             try:
1004                 sorted_roots = self.map_new_services(
1005                     roots_map, locator,
1006                     force_rebuild=(tries_left < num_retries),
1007                     need_writable=False)
1008             except Exception as error:
1009                 loop.save_result(error)
1010                 continue
1011
1012             # Query KeepService objects that haven't returned
1013             # permanent failure, in our specified shuffle order.
1014             services_to_try = [roots_map[root]
1015                                for root in sorted_roots
1016                                if roots_map[root].usable()]
1017             for keep_service in services_to_try:
1018                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1019                 if blob is not None:
1020                     break
1021             loop.save_result((blob, len(services_to_try)))
1022
1023         # Always cache the result, then return it if we succeeded.
1024         if method == "GET":
1025             slot.set(blob)
1026             self.block_cache.cap_cache()
1027         if loop.success():
1028             if method == "HEAD":
1029                 return True
1030             else:
1031                 return blob
1032
1033         # Q: Including 403 is necessary for the Keep tests to continue
1034         # passing, but maybe they should expect KeepReadError instead?
1035         not_founds = sum(1 for key in sorted_roots
1036                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1037         service_errors = ((key, roots_map[key].last_result()['error'])
1038                           for key in sorted_roots)
1039         if not roots_map:
1040             raise arvados.errors.KeepReadError(
1041                 "failed to read {}: no Keep services available ({})".format(
1042                     loc_s, loop.last_result()))
1043         elif not_founds == len(sorted_roots):
1044             raise arvados.errors.NotFoundError(
1045                 "{} not found".format(loc_s), service_errors)
1046         else:
1047             raise arvados.errors.KeepReadError(
1048                 "failed to read {}".format(loc_s), service_errors, label="service")
1049
1050     @retry.retry_method
1051     def put(self, data, copies=2, num_retries=None):
1052         """Save data in Keep.
1053
1054         This method will get a list of Keep services from the API server, and
1055         send the data to each one simultaneously in a new thread.  Once the
1056         uploads are finished, if enough copies are saved, this method returns
1057         the most recent HTTP response body.  If requests fail to upload
1058         enough copies, this method raises KeepWriteError.
1059
1060         Arguments:
1061         * data: The string of data to upload.
1062         * copies: The number of copies that the user requires be saved.
1063           Default 2.
1064         * num_retries: The number of times to retry PUT requests to
1065           *each* Keep server if it returns temporary failures, with
1066           exponential backoff.  The default value is set when the
1067           KeepClient is initialized.
1068         """
1069
1070         if isinstance(data, str):
1071             data = data.encode("ascii")
1072         elif not isinstance(data, str):
1073             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1074
1075         self.put_counter.add(1)
1076
1077         data_hash = hashlib.md5(data).hexdigest()
1078         loc_s = data_hash + '+' + str(len(data))
1079         if copies < 1:
1080             return loc_s
1081         locator = KeepLocator(loc_s)
1082
1083         headers = {}
1084         # Tell the proxy how many copies we want it to store
1085         headers['X-Keep-Desired-Replicas'] = str(copies)
1086         roots_map = {}
1087         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1088                                backoff_start=2)
1089         done = 0
1090         for tries_left in loop:
1091             try:
1092                 sorted_roots = self.map_new_services(
1093                     roots_map, locator,
1094                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1095             except Exception as error:
1096                 loop.save_result(error)
1097                 continue
1098
1099             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1100                                                         data_hash=data_hash,
1101                                                         copies=copies - done,
1102                                                         max_service_replicas=self.max_replicas_per_service,
1103                                                         timeout=self.current_timeout(num_retries - tries_left))
1104             for service_root, ks in [(root, roots_map[root])
1105                                      for root in sorted_roots]:
1106                 if ks.finished():
1107                     continue
1108                 writer_pool.add_task(ks, service_root)
1109             writer_pool.join()
1110             done += writer_pool.done()
1111             loop.save_result((done >= copies, writer_pool.total_task_nr))
1112
1113         if loop.success():
1114             return writer_pool.response()
1115         if not roots_map:
1116             raise arvados.errors.KeepWriteError(
1117                 "failed to write {}: no Keep services available ({})".format(
1118                     data_hash, loop.last_result()))
1119         else:
1120             service_errors = ((key, roots_map[key].last_result()['error'])
1121                               for key in sorted_roots
1122                               if roots_map[key].last_result()['error'])
1123             raise arvados.errors.KeepWriteError(
1124                 "failed to write {} (wanted {} copies but wrote {})".format(
1125                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1126
1127     def local_store_put(self, data, copies=1, num_retries=None):
1128         """A stub for put().
1129
1130         This method is used in place of the real put() method when
1131         using local storage (see constructor's local_store argument).
1132
1133         copies and num_retries arguments are ignored: they are here
1134         only for the sake of offering the same call signature as
1135         put().
1136
1137         Data stored this way can be retrieved via local_store_get().
1138         """
1139         md5 = hashlib.md5(data).hexdigest()
1140         locator = '%s+%d' % (md5, len(data))
1141         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1142             f.write(data)
1143         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1144                   os.path.join(self.local_store, md5))
1145         return locator
1146
1147     def local_store_get(self, loc_s, num_retries=None):
1148         """Companion to local_store_put()."""
1149         try:
1150             locator = KeepLocator(loc_s)
1151         except ValueError:
1152             raise arvados.errors.NotFoundError(
1153                 "Invalid data locator: '%s'" % loc_s)
1154         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1155             return ''
1156         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1157             return f.read()
1158
1159     def is_cached(self, locator):
1160         return self.block_cache.reserve_cache(expect_hash)