11308: Futurize.
[arvados.git] / sdk / python / arvados / keep.py
1 from __future__ import absolute_import
2 import cStringIO
3 import datetime
4 import hashlib
5 import logging
6 import math
7 import os
8 import pycurl
9 import Queue
10 import re
11 import socket
12 import ssl
13 import sys
14 import threading
15 from . import timer
16 import urlparse
17
18 import arvados
19 import arvados.config as config
20 import arvados.errors
21 import arvados.retry as retry
22 import arvados.util
23
24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
26
27
28 # Monkey patch TCP constants when not available (apple). Values sourced from:
29 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
30 if sys.platform == 'darwin':
31     if not hasattr(socket, 'TCP_KEEPALIVE'):
32         socket.TCP_KEEPALIVE = 0x010
33     if not hasattr(socket, 'TCP_KEEPINTVL'):
34         socket.TCP_KEEPINTVL = 0x101
35     if not hasattr(socket, 'TCP_KEEPCNT'):
36         socket.TCP_KEEPCNT = 0x102
37
38
39 class KeepLocator(object):
40     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
41     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
42
43     def __init__(self, locator_str):
44         self.hints = []
45         self._perm_sig = None
46         self._perm_expiry = None
47         pieces = iter(locator_str.split('+'))
48         self.md5sum = next(pieces)
49         try:
50             self.size = int(next(pieces))
51         except StopIteration:
52             self.size = None
53         for hint in pieces:
54             if self.HINT_RE.match(hint) is None:
55                 raise ValueError("invalid hint format: {}".format(hint))
56             elif hint.startswith('A'):
57                 self.parse_permission_hint(hint)
58             else:
59                 self.hints.append(hint)
60
61     def __str__(self):
62         return '+'.join(
63             str(s) for s in [self.md5sum, self.size,
64                              self.permission_hint()] + self.hints
65             if s is not None)
66
67     def stripped(self):
68         if self.size is not None:
69             return "%s+%i" % (self.md5sum, self.size)
70         else:
71             return self.md5sum
72
73     def _make_hex_prop(name, length):
74         # Build and return a new property with the given name that
75         # must be a hex string of the given length.
76         data_name = '_{}'.format(name)
77         def getter(self):
78             return getattr(self, data_name)
79         def setter(self, hex_str):
80             if not arvados.util.is_hex(hex_str, length):
81                 raise ValueError("{} is not a {}-digit hex string: {}".
82                                  format(name, length, hex_str))
83             setattr(self, data_name, hex_str)
84         return property(getter, setter)
85
86     md5sum = _make_hex_prop('md5sum', 32)
87     perm_sig = _make_hex_prop('perm_sig', 40)
88
89     @property
90     def perm_expiry(self):
91         return self._perm_expiry
92
93     @perm_expiry.setter
94     def perm_expiry(self, value):
95         if not arvados.util.is_hex(value, 1, 8):
96             raise ValueError(
97                 "permission timestamp must be a hex Unix timestamp: {}".
98                 format(value))
99         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
100
101     def permission_hint(self):
102         data = [self.perm_sig, self.perm_expiry]
103         if None in data:
104             return None
105         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
106         return "A{}@{:08x}".format(*data)
107
108     def parse_permission_hint(self, s):
109         try:
110             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
111         except IndexError:
112             raise ValueError("bad permission hint {}".format(s))
113
114     def permission_expired(self, as_of_dt=None):
115         if self.perm_expiry is None:
116             return False
117         elif as_of_dt is None:
118             as_of_dt = datetime.datetime.now()
119         return self.perm_expiry <= as_of_dt
120
121
122 class Keep(object):
123     """Simple interface to a global KeepClient object.
124
125     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
126     own API client.  The global KeepClient will build an API client from the
127     current Arvados configuration, which may not match the one you built.
128     """
129     _last_key = None
130
131     @classmethod
132     def global_client_object(cls):
133         global global_client_object
134         # Previously, KeepClient would change its behavior at runtime based
135         # on these configuration settings.  We simulate that behavior here
136         # by checking the values and returning a new KeepClient if any of
137         # them have changed.
138         key = (config.get('ARVADOS_API_HOST'),
139                config.get('ARVADOS_API_TOKEN'),
140                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
141                config.get('ARVADOS_KEEP_PROXY'),
142                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
143                os.environ.get('KEEP_LOCAL_STORE'))
144         if (global_client_object is None) or (cls._last_key != key):
145             global_client_object = KeepClient()
146             cls._last_key = key
147         return global_client_object
148
149     @staticmethod
150     def get(locator, **kwargs):
151         return Keep.global_client_object().get(locator, **kwargs)
152
153     @staticmethod
154     def put(data, **kwargs):
155         return Keep.global_client_object().put(data, **kwargs)
156
157 class KeepBlockCache(object):
158     # Default RAM cache is 256MiB
159     def __init__(self, cache_max=(256 * 1024 * 1024)):
160         self.cache_max = cache_max
161         self._cache = []
162         self._cache_lock = threading.Lock()
163
164     class CacheSlot(object):
165         __slots__ = ("locator", "ready", "content")
166
167         def __init__(self, locator):
168             self.locator = locator
169             self.ready = threading.Event()
170             self.content = None
171
172         def get(self):
173             self.ready.wait()
174             return self.content
175
176         def set(self, value):
177             self.content = value
178             self.ready.set()
179
180         def size(self):
181             if self.content is None:
182                 return 0
183             else:
184                 return len(self.content)
185
186     def cap_cache(self):
187         '''Cap the cache size to self.cache_max'''
188         with self._cache_lock:
189             # Select all slots except those where ready.is_set() and content is
190             # None (that means there was an error reading the block).
191             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
192             sm = sum([slot.size() for slot in self._cache])
193             while len(self._cache) > 0 and sm > self.cache_max:
194                 for i in xrange(len(self._cache)-1, -1, -1):
195                     if self._cache[i].ready.is_set():
196                         del self._cache[i]
197                         break
198                 sm = sum([slot.size() for slot in self._cache])
199
200     def _get(self, locator):
201         # Test if the locator is already in the cache
202         for i in xrange(0, len(self._cache)):
203             if self._cache[i].locator == locator:
204                 n = self._cache[i]
205                 if i != 0:
206                     # move it to the front
207                     del self._cache[i]
208                     self._cache.insert(0, n)
209                 return n
210         return None
211
212     def get(self, locator):
213         with self._cache_lock:
214             return self._get(locator)
215
216     def reserve_cache(self, locator):
217         '''Reserve a cache slot for the specified locator,
218         or return the existing slot.'''
219         with self._cache_lock:
220             n = self._get(locator)
221             if n:
222                 return n, False
223             else:
224                 # Add a new cache slot for the locator
225                 n = KeepBlockCache.CacheSlot(locator)
226                 self._cache.insert(0, n)
227                 return n, True
228
229 class Counter(object):
230     def __init__(self, v=0):
231         self._lk = threading.Lock()
232         self._val = v
233
234     def add(self, v):
235         with self._lk:
236             self._val += v
237
238     def get(self):
239         with self._lk:
240             return self._val
241
242
243 class KeepClient(object):
244
245     # Default Keep server connection timeout:  2 seconds
246     # Default Keep server read timeout:       256 seconds
247     # Default Keep server bandwidth minimum:  32768 bytes per second
248     # Default Keep proxy connection timeout:  20 seconds
249     # Default Keep proxy read timeout:        256 seconds
250     # Default Keep proxy bandwidth minimum:   32768 bytes per second
251     DEFAULT_TIMEOUT = (2, 256, 32768)
252     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
253
254
255     class KeepService(object):
256         """Make requests to a single Keep service, and track results.
257
258         A KeepService is intended to last long enough to perform one
259         transaction (GET or PUT) against one Keep service. This can
260         involve calling either get() or put() multiple times in order
261         to retry after transient failures. However, calling both get()
262         and put() on a single instance -- or using the same instance
263         to access two different Keep services -- will not produce
264         sensible behavior.
265         """
266
267         HTTP_ERRORS = (
268             socket.error,
269             ssl.SSLError,
270             arvados.errors.HttpError,
271         )
272
273         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
274                      upload_counter=None,
275                      download_counter=None, **headers):
276             self.root = root
277             self._user_agent_pool = user_agent_pool
278             self._result = {'error': None}
279             self._usable = True
280             self._session = None
281             self.get_headers = {'Accept': 'application/octet-stream'}
282             self.get_headers.update(headers)
283             self.put_headers = headers
284             self.upload_counter = upload_counter
285             self.download_counter = download_counter
286
287         def usable(self):
288             """Is it worth attempting a request?"""
289             return self._usable
290
291         def finished(self):
292             """Did the request succeed or encounter permanent failure?"""
293             return self._result['error'] == False or not self._usable
294
295         def last_result(self):
296             return self._result
297
298         def _get_user_agent(self):
299             try:
300                 return self._user_agent_pool.get(block=False)
301             except Queue.Empty:
302                 return pycurl.Curl()
303
304         def _put_user_agent(self, ua):
305             try:
306                 ua.reset()
307                 self._user_agent_pool.put(ua, block=False)
308             except:
309                 ua.close()
310
311         @staticmethod
312         def _socket_open(family, socktype, protocol, address=None):
313             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
314             s = socket.socket(family, socktype, protocol)
315             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
316             # Will throw invalid protocol error on mac. This test prevents that.
317             if hasattr(socket, 'TCP_KEEPIDLE'):
318                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
319             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
320             return s
321
322         def get(self, locator, method="GET", timeout=None):
323             # locator is a KeepLocator object.
324             url = self.root + str(locator)
325             _logger.debug("Request: %s %s", method, url)
326             curl = self._get_user_agent()
327             ok = None
328             try:
329                 with timer.Timer() as t:
330                     self._headers = {}
331                     response_body = cStringIO.StringIO()
332                     curl.setopt(pycurl.NOSIGNAL, 1)
333                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
334                     curl.setopt(pycurl.URL, url.encode('utf-8'))
335                     curl.setopt(pycurl.HTTPHEADER, [
336                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
337                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
338                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
339                     if method == "HEAD":
340                         curl.setopt(pycurl.NOBODY, True)
341                     self._setcurltimeouts(curl, timeout)
342
343                     try:
344                         curl.perform()
345                     except Exception as e:
346                         raise arvados.errors.HttpError(0, str(e))
347                     self._result = {
348                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
349                         'body': response_body.getvalue(),
350                         'headers': self._headers,
351                         'error': False,
352                     }
353
354                 ok = retry.check_http_response_success(self._result['status_code'])
355                 if not ok:
356                     self._result['error'] = arvados.errors.HttpError(
357                         self._result['status_code'],
358                         self._headers.get('x-status-line', 'Error'))
359             except self.HTTP_ERRORS as e:
360                 self._result = {
361                     'error': e,
362                 }
363             self._usable = ok != False
364             if self._result.get('status_code', None):
365                 # The client worked well enough to get an HTTP status
366                 # code, so presumably any problems are just on the
367                 # server side and it's OK to reuse the client.
368                 self._put_user_agent(curl)
369             else:
370                 # Don't return this client to the pool, in case it's
371                 # broken.
372                 curl.close()
373             if not ok:
374                 _logger.debug("Request fail: GET %s => %s: %s",
375                               url, type(self._result['error']), str(self._result['error']))
376                 return None
377             if method == "HEAD":
378                 _logger.info("HEAD %s: %s bytes",
379                          self._result['status_code'],
380                          self._result.get('content-length'))
381                 return True
382
383             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
384                          self._result['status_code'],
385                          len(self._result['body']),
386                          t.msecs,
387                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
388
389             if self.download_counter:
390                 self.download_counter.add(len(self._result['body']))
391             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
392             if resp_md5 != locator.md5sum:
393                 _logger.warning("Checksum fail: md5(%s) = %s",
394                                 url, resp_md5)
395                 self._result['error'] = arvados.errors.HttpError(
396                     0, 'Checksum fail')
397                 return None
398             return self._result['body']
399
400         def put(self, hash_s, body, timeout=None):
401             url = self.root + hash_s
402             _logger.debug("Request: PUT %s", url)
403             curl = self._get_user_agent()
404             ok = None
405             try:
406                 with timer.Timer() as t:
407                     self._headers = {}
408                     body_reader = cStringIO.StringIO(body)
409                     response_body = cStringIO.StringIO()
410                     curl.setopt(pycurl.NOSIGNAL, 1)
411                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
412                     curl.setopt(pycurl.URL, url.encode('utf-8'))
413                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
414                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
415                     # response) instead of sending the request body immediately.
416                     # This allows the server to reject the request if the request
417                     # is invalid or the server is read-only, without waiting for
418                     # the client to send the entire block.
419                     curl.setopt(pycurl.UPLOAD, True)
420                     curl.setopt(pycurl.INFILESIZE, len(body))
421                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
422                     curl.setopt(pycurl.HTTPHEADER, [
423                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
424                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
425                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
426                     self._setcurltimeouts(curl, timeout)
427                     try:
428                         curl.perform()
429                     except Exception as e:
430                         raise arvados.errors.HttpError(0, str(e))
431                     self._result = {
432                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
433                         'body': response_body.getvalue(),
434                         'headers': self._headers,
435                         'error': False,
436                     }
437                 ok = retry.check_http_response_success(self._result['status_code'])
438                 if not ok:
439                     self._result['error'] = arvados.errors.HttpError(
440                         self._result['status_code'],
441                         self._headers.get('x-status-line', 'Error'))
442             except self.HTTP_ERRORS as e:
443                 self._result = {
444                     'error': e,
445                 }
446             self._usable = ok != False # still usable if ok is True or None
447             if self._result.get('status_code', None):
448                 # Client is functional. See comment in get().
449                 self._put_user_agent(curl)
450             else:
451                 curl.close()
452             if not ok:
453                 _logger.debug("Request fail: PUT %s => %s: %s",
454                               url, type(self._result['error']), str(self._result['error']))
455                 return False
456             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
457                          self._result['status_code'],
458                          len(body),
459                          t.msecs,
460                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
461             if self.upload_counter:
462                 self.upload_counter.add(len(body))
463             return True
464
465         def _setcurltimeouts(self, curl, timeouts):
466             if not timeouts:
467                 return
468             elif isinstance(timeouts, tuple):
469                 if len(timeouts) == 2:
470                     conn_t, xfer_t = timeouts
471                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
472                 else:
473                     conn_t, xfer_t, bandwidth_bps = timeouts
474             else:
475                 conn_t, xfer_t = (timeouts, timeouts)
476                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
477             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
478             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
479             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
480
481         def _headerfunction(self, header_line):
482             header_line = header_line.decode('iso-8859-1')
483             if ':' in header_line:
484                 name, value = header_line.split(':', 1)
485                 name = name.strip().lower()
486                 value = value.strip()
487             elif self._headers:
488                 name = self._lastheadername
489                 value = self._headers[name] + ' ' + header_line.strip()
490             elif header_line.startswith('HTTP/'):
491                 name = 'x-status-line'
492                 value = header_line
493             else:
494                 _logger.error("Unexpected header line: %s", header_line)
495                 return
496             self._lastheadername = name
497             self._headers[name] = value
498             # Returning None implies all bytes were written
499     
500
501     class KeepWriterQueue(Queue.Queue):
502         def __init__(self, copies):
503             Queue.Queue.__init__(self) # Old-style superclass
504             self.wanted_copies = copies
505             self.successful_copies = 0
506             self.response = None
507             self.successful_copies_lock = threading.Lock()
508             self.pending_tries = copies
509             self.pending_tries_notification = threading.Condition()
510         
511         def write_success(self, response, replicas_nr):
512             with self.successful_copies_lock:
513                 self.successful_copies += replicas_nr
514                 self.response = response
515             with self.pending_tries_notification:
516                 self.pending_tries_notification.notify_all()
517         
518         def write_fail(self, ks):
519             with self.pending_tries_notification:
520                 self.pending_tries += 1
521                 self.pending_tries_notification.notify()
522         
523         def pending_copies(self):
524             with self.successful_copies_lock:
525                 return self.wanted_copies - self.successful_copies
526
527         def get_next_task(self):
528             with self.pending_tries_notification:
529                 while True:
530                     if self.pending_copies() < 1:
531                         # This notify_all() is unnecessary --
532                         # write_success() already called notify_all()
533                         # when pending<1 became true, so it's not
534                         # possible for any other thread to be in
535                         # wait() now -- but it's cheap insurance
536                         # against deadlock so we do it anyway:
537                         self.pending_tries_notification.notify_all()
538                         # Drain the queue and then raise Queue.Empty
539                         while True:
540                             self.get_nowait()
541                             self.task_done()
542                     elif self.pending_tries > 0:
543                         service, service_root = self.get_nowait()
544                         if service.finished():
545                             self.task_done()
546                             continue
547                         self.pending_tries -= 1
548                         return service, service_root
549                     elif self.empty():
550                         self.pending_tries_notification.notify_all()
551                         raise Queue.Empty
552                     else:
553                         self.pending_tries_notification.wait()
554
555
556     class KeepWriterThreadPool(object):
557         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
558             self.total_task_nr = 0
559             self.wanted_copies = copies
560             if (not max_service_replicas) or (max_service_replicas >= copies):
561                 num_threads = 1
562             else:
563                 num_threads = int(math.ceil(float(copies) / max_service_replicas))
564             _logger.debug("Pool max threads is %d", num_threads)
565             self.workers = []
566             self.queue = KeepClient.KeepWriterQueue(copies)
567             # Create workers
568             for _ in range(num_threads):
569                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
570                 self.workers.append(w)
571         
572         def add_task(self, ks, service_root):
573             self.queue.put((ks, service_root))
574             self.total_task_nr += 1
575         
576         def done(self):
577             return self.queue.successful_copies
578         
579         def join(self):
580             # Start workers
581             for worker in self.workers:
582                 worker.start()
583             # Wait for finished work
584             self.queue.join()
585         
586         def response(self):
587             return self.queue.response
588     
589     
590     class KeepWriterThread(threading.Thread):
591         TaskFailed = RuntimeError()
592
593         def __init__(self, queue, data, data_hash, timeout=None):
594             super(KeepClient.KeepWriterThread, self).__init__()
595             self.timeout = timeout
596             self.queue = queue
597             self.data = data
598             self.data_hash = data_hash
599             self.daemon = True
600
601         def run(self):
602             while True:
603                 try:
604                     service, service_root = self.queue.get_next_task()
605                 except Queue.Empty:
606                     return
607                 try:
608                     locator, copies = self.do_task(service, service_root)
609                 except Exception as e:
610                     if e is not self.TaskFailed:
611                         _logger.exception("Exception in KeepWriterThread")
612                     self.queue.write_fail(service)
613                 else:
614                     self.queue.write_success(locator, copies)
615                 finally:
616                     self.queue.task_done()
617
618         def do_task(self, service, service_root):
619             success = bool(service.put(self.data_hash,
620                                         self.data,
621                                         timeout=self.timeout))
622             result = service.last_result()
623
624             if not success:
625                 if result.get('status_code', None):
626                     _logger.debug("Request fail: PUT %s => %s %s",
627                                   self.data_hash,
628                                   result['status_code'],
629                                   result['body'])
630                 raise self.TaskFailed
631
632             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
633                           str(threading.current_thread()),
634                           self.data_hash,
635                           len(self.data),
636                           service_root)
637             try:
638                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
639             except (KeyError, ValueError):
640                 replicas_stored = 1
641
642             return result['body'].strip(), replicas_stored
643
644
645     def __init__(self, api_client=None, proxy=None,
646                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
647                  api_token=None, local_store=None, block_cache=None,
648                  num_retries=0, session=None):
649         """Initialize a new KeepClient.
650
651         Arguments:
652         :api_client:
653           The API client to use to find Keep services.  If not
654           provided, KeepClient will build one from available Arvados
655           configuration.
656
657         :proxy:
658           If specified, this KeepClient will send requests to this Keep
659           proxy.  Otherwise, KeepClient will fall back to the setting of the
660           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
661           If you want to KeepClient does not use a proxy, pass in an empty
662           string.
663
664         :timeout:
665           The initial timeout (in seconds) for HTTP requests to Keep
666           non-proxy servers.  A tuple of three floats is interpreted as
667           (connection_timeout, read_timeout, minimum_bandwidth). A connection
668           will be aborted if the average traffic rate falls below
669           minimum_bandwidth bytes per second over an interval of read_timeout
670           seconds. Because timeouts are often a result of transient server
671           load, the actual connection timeout will be increased by a factor
672           of two on each retry.
673           Default: (2, 256, 32768).
674
675         :proxy_timeout:
676           The initial timeout (in seconds) for HTTP requests to
677           Keep proxies. A tuple of three floats is interpreted as
678           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
679           described above for adjusting connection timeouts on retry also
680           applies.
681           Default: (20, 256, 32768).
682
683         :api_token:
684           If you're not using an API client, but only talking
685           directly to a Keep proxy, this parameter specifies an API token
686           to authenticate Keep requests.  It is an error to specify both
687           api_client and api_token.  If you specify neither, KeepClient
688           will use one available from the Arvados configuration.
689
690         :local_store:
691           If specified, this KeepClient will bypass Keep
692           services, and save data to the named directory.  If unspecified,
693           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
694           environment variable.  If you want to ensure KeepClient does not
695           use local storage, pass in an empty string.  This is primarily
696           intended to mock a server for testing.
697
698         :num_retries:
699           The default number of times to retry failed requests.
700           This will be used as the default num_retries value when get() and
701           put() are called.  Default 0.
702         """
703         self.lock = threading.Lock()
704         if proxy is None:
705             if config.get('ARVADOS_KEEP_SERVICES'):
706                 proxy = config.get('ARVADOS_KEEP_SERVICES')
707             else:
708                 proxy = config.get('ARVADOS_KEEP_PROXY')
709         if api_token is None:
710             if api_client is None:
711                 api_token = config.get('ARVADOS_API_TOKEN')
712             else:
713                 api_token = api_client.api_token
714         elif api_client is not None:
715             raise ValueError(
716                 "can't build KeepClient with both API client and token")
717         if local_store is None:
718             local_store = os.environ.get('KEEP_LOCAL_STORE')
719
720         self.block_cache = block_cache if block_cache else KeepBlockCache()
721         self.timeout = timeout
722         self.proxy_timeout = proxy_timeout
723         self._user_agent_pool = Queue.LifoQueue()
724         self.upload_counter = Counter()
725         self.download_counter = Counter()
726         self.put_counter = Counter()
727         self.get_counter = Counter()
728         self.hits_counter = Counter()
729         self.misses_counter = Counter()
730
731         if local_store:
732             self.local_store = local_store
733             self.get = self.local_store_get
734             self.put = self.local_store_put
735         else:
736             self.num_retries = num_retries
737             self.max_replicas_per_service = None
738             if proxy:
739                 proxy_uris = proxy.split()
740                 for i in range(len(proxy_uris)):
741                     if not proxy_uris[i].endswith('/'):
742                         proxy_uris[i] += '/'
743                     # URL validation
744                     url = urlparse.urlparse(proxy_uris[i])
745                     if not (url.scheme and url.netloc):
746                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
747                 self.api_token = api_token
748                 self._gateway_services = {}
749                 self._keep_services = [{
750                     'uuid': "00000-bi6l4-%015d" % idx,
751                     'service_type': 'proxy',
752                     '_service_root': uri,
753                     } for idx, uri in enumerate(proxy_uris)]
754                 self._writable_services = self._keep_services
755                 self.using_proxy = True
756                 self._static_services_list = True
757             else:
758                 # It's important to avoid instantiating an API client
759                 # unless we actually need one, for testing's sake.
760                 if api_client is None:
761                     api_client = arvados.api('v1')
762                 self.api_client = api_client
763                 self.api_token = api_client.api_token
764                 self._gateway_services = {}
765                 self._keep_services = None
766                 self._writable_services = None
767                 self.using_proxy = None
768                 self._static_services_list = False
769
770     def current_timeout(self, attempt_number):
771         """Return the appropriate timeout to use for this client.
772
773         The proxy timeout setting if the backend service is currently a proxy,
774         the regular timeout setting otherwise.  The `attempt_number` indicates
775         how many times the operation has been tried already (starting from 0
776         for the first try), and scales the connection timeout portion of the
777         return value accordingly.
778
779         """
780         # TODO(twp): the timeout should be a property of a
781         # KeepService, not a KeepClient. See #4488.
782         t = self.proxy_timeout if self.using_proxy else self.timeout
783         if len(t) == 2:
784             return (t[0] * (1 << attempt_number), t[1])
785         else:
786             return (t[0] * (1 << attempt_number), t[1], t[2])
787     def _any_nondisk_services(self, service_list):
788         return any(ks.get('service_type', 'disk') != 'disk'
789                    for ks in service_list)
790
791     def build_services_list(self, force_rebuild=False):
792         if (self._static_services_list or
793               (self._keep_services and not force_rebuild)):
794             return
795         with self.lock:
796             try:
797                 keep_services = self.api_client.keep_services().accessible()
798             except Exception:  # API server predates Keep services.
799                 keep_services = self.api_client.keep_disks().list()
800
801             # Gateway services are only used when specified by UUID,
802             # so there's nothing to gain by filtering them by
803             # service_type.
804             self._gateway_services = {ks['uuid']: ks for ks in
805                                       keep_services.execute()['items']}
806             if not self._gateway_services:
807                 raise arvados.errors.NoKeepServersError()
808
809             # Precompute the base URI for each service.
810             for r in self._gateway_services.itervalues():
811                 host = r['service_host']
812                 if not host.startswith('[') and host.find(':') >= 0:
813                     # IPv6 URIs must be formatted like http://[::1]:80/...
814                     host = '[' + host + ']'
815                 r['_service_root'] = "{}://{}:{:d}/".format(
816                     'https' if r['service_ssl_flag'] else 'http',
817                     host,
818                     r['service_port'])
819
820             _logger.debug(str(self._gateway_services))
821             self._keep_services = [
822                 ks for ks in self._gateway_services.itervalues()
823                 if not ks.get('service_type', '').startswith('gateway:')]
824             self._writable_services = [ks for ks in self._keep_services
825                                        if not ks.get('read_only')]
826
827             # For disk type services, max_replicas_per_service is 1
828             # It is unknown (unlimited) for other service types.
829             if self._any_nondisk_services(self._writable_services):
830                 self.max_replicas_per_service = None
831             else:
832                 self.max_replicas_per_service = 1
833
834     def _service_weight(self, data_hash, service_uuid):
835         """Compute the weight of a Keep service endpoint for a data
836         block with a known hash.
837
838         The weight is md5(h + u) where u is the last 15 characters of
839         the service endpoint's UUID.
840         """
841         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
842
843     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
844         """Return an array of Keep service endpoints, in the order in
845         which they should be probed when reading or writing data with
846         the given hash+hints.
847         """
848         self.build_services_list(force_rebuild)
849
850         sorted_roots = []
851         # Use the services indicated by the given +K@... remote
852         # service hints, if any are present and can be resolved to a
853         # URI.
854         for hint in locator.hints:
855             if hint.startswith('K@'):
856                 if len(hint) == 7:
857                     sorted_roots.append(
858                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
859                 elif len(hint) == 29:
860                     svc = self._gateway_services.get(hint[2:])
861                     if svc:
862                         sorted_roots.append(svc['_service_root'])
863
864         # Sort the available local services by weight (heaviest first)
865         # for this locator, and return their service_roots (base URIs)
866         # in that order.
867         use_services = self._keep_services
868         if need_writable:
869             use_services = self._writable_services
870         self.using_proxy = self._any_nondisk_services(use_services)
871         sorted_roots.extend([
872             svc['_service_root'] for svc in sorted(
873                 use_services,
874                 reverse=True,
875                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
876         _logger.debug("{}: {}".format(locator, sorted_roots))
877         return sorted_roots
878
879     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
880         # roots_map is a dictionary, mapping Keep service root strings
881         # to KeepService objects.  Poll for Keep services, and add any
882         # new ones to roots_map.  Return the current list of local
883         # root strings.
884         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
885         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
886         for root in local_roots:
887             if root not in roots_map:
888                 roots_map[root] = self.KeepService(
889                     root, self._user_agent_pool,
890                     upload_counter=self.upload_counter,
891                     download_counter=self.download_counter,
892                     **headers)
893         return local_roots
894
895     @staticmethod
896     def _check_loop_result(result):
897         # KeepClient RetryLoops should save results as a 2-tuple: the
898         # actual result of the request, and the number of servers available
899         # to receive the request this round.
900         # This method returns True if there's a real result, False if
901         # there are no more servers available, otherwise None.
902         if isinstance(result, Exception):
903             return None
904         result, tried_server_count = result
905         if (result is not None) and (result is not False):
906             return True
907         elif tried_server_count < 1:
908             _logger.info("No more Keep services to try; giving up")
909             return False
910         else:
911             return None
912
913     def get_from_cache(self, loc):
914         """Fetch a block only if is in the cache, otherwise return None."""
915         slot = self.block_cache.get(loc)
916         if slot is not None and slot.ready.is_set():
917             return slot.get()
918         else:
919             return None
920
921     @retry.retry_method
922     def head(self, loc_s, num_retries=None):
923         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
924
925     @retry.retry_method
926     def get(self, loc_s, num_retries=None):
927         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
928
929     def _get_or_head(self, loc_s, method="GET", num_retries=None):
930         """Get data from Keep.
931
932         This method fetches one or more blocks of data from Keep.  It
933         sends a request each Keep service registered with the API
934         server (or the proxy provided when this client was
935         instantiated), then each service named in location hints, in
936         sequence.  As soon as one service provides the data, it's
937         returned.
938
939         Arguments:
940         * loc_s: A string of one or more comma-separated locators to fetch.
941           This method returns the concatenation of these blocks.
942         * num_retries: The number of times to retry GET requests to
943           *each* Keep server if it returns temporary failures, with
944           exponential backoff.  Note that, in each loop, the method may try
945           to fetch data from every available Keep service, along with any
946           that are named in location hints in the locator.  The default value
947           is set when the KeepClient is initialized.
948         """
949         if ',' in loc_s:
950             return ''.join(self.get(x) for x in loc_s.split(','))
951
952         self.get_counter.add(1)
953
954         locator = KeepLocator(loc_s)
955         if method == "GET":
956             slot, first = self.block_cache.reserve_cache(locator.md5sum)
957             if not first:
958                 self.hits_counter.add(1)
959                 v = slot.get()
960                 return v
961
962         self.misses_counter.add(1)
963
964         # If the locator has hints specifying a prefix (indicating a
965         # remote keepproxy) or the UUID of a local gateway service,
966         # read data from the indicated service(s) instead of the usual
967         # list of local disk services.
968         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
969                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
970         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
971                            for hint in locator.hints if (
972                                    hint.startswith('K@') and
973                                    len(hint) == 29 and
974                                    self._gateway_services.get(hint[2:])
975                                    )])
976         # Map root URLs to their KeepService objects.
977         roots_map = {
978             root: self.KeepService(root, self._user_agent_pool,
979                                    upload_counter=self.upload_counter,
980                                    download_counter=self.download_counter)
981             for root in hint_roots
982         }
983
984         # See #3147 for a discussion of the loop implementation.  Highlights:
985         # * Refresh the list of Keep services after each failure, in case
986         #   it's being updated.
987         # * Retry until we succeed, we're out of retries, or every available
988         #   service has returned permanent failure.
989         sorted_roots = []
990         roots_map = {}
991         blob = None
992         loop = retry.RetryLoop(num_retries, self._check_loop_result,
993                                backoff_start=2)
994         for tries_left in loop:
995             try:
996                 sorted_roots = self.map_new_services(
997                     roots_map, locator,
998                     force_rebuild=(tries_left < num_retries),
999                     need_writable=False)
1000             except Exception as error:
1001                 loop.save_result(error)
1002                 continue
1003
1004             # Query KeepService objects that haven't returned
1005             # permanent failure, in our specified shuffle order.
1006             services_to_try = [roots_map[root]
1007                                for root in sorted_roots
1008                                if roots_map[root].usable()]
1009             for keep_service in services_to_try:
1010                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1011                 if blob is not None:
1012                     break
1013             loop.save_result((blob, len(services_to_try)))
1014
1015         # Always cache the result, then return it if we succeeded.
1016         if method == "GET":
1017             slot.set(blob)
1018             self.block_cache.cap_cache()
1019         if loop.success():
1020             if method == "HEAD":
1021                 return True
1022             else:
1023                 return blob
1024
1025         # Q: Including 403 is necessary for the Keep tests to continue
1026         # passing, but maybe they should expect KeepReadError instead?
1027         not_founds = sum(1 for key in sorted_roots
1028                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1029         service_errors = ((key, roots_map[key].last_result()['error'])
1030                           for key in sorted_roots)
1031         if not roots_map:
1032             raise arvados.errors.KeepReadError(
1033                 "failed to read {}: no Keep services available ({})".format(
1034                     loc_s, loop.last_result()))
1035         elif not_founds == len(sorted_roots):
1036             raise arvados.errors.NotFoundError(
1037                 "{} not found".format(loc_s), service_errors)
1038         else:
1039             raise arvados.errors.KeepReadError(
1040                 "failed to read {}".format(loc_s), service_errors, label="service")
1041
1042     @retry.retry_method
1043     def put(self, data, copies=2, num_retries=None):
1044         """Save data in Keep.
1045
1046         This method will get a list of Keep services from the API server, and
1047         send the data to each one simultaneously in a new thread.  Once the
1048         uploads are finished, if enough copies are saved, this method returns
1049         the most recent HTTP response body.  If requests fail to upload
1050         enough copies, this method raises KeepWriteError.
1051
1052         Arguments:
1053         * data: The string of data to upload.
1054         * copies: The number of copies that the user requires be saved.
1055           Default 2.
1056         * num_retries: The number of times to retry PUT requests to
1057           *each* Keep server if it returns temporary failures, with
1058           exponential backoff.  The default value is set when the
1059           KeepClient is initialized.
1060         """
1061
1062         if isinstance(data, unicode):
1063             data = data.encode("ascii")
1064         elif not isinstance(data, str):
1065             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1066
1067         self.put_counter.add(1)
1068
1069         data_hash = hashlib.md5(data).hexdigest()
1070         loc_s = data_hash + '+' + str(len(data))
1071         if copies < 1:
1072             return loc_s
1073         locator = KeepLocator(loc_s)
1074
1075         headers = {}
1076         # Tell the proxy how many copies we want it to store
1077         headers['X-Keep-Desired-Replicas'] = str(copies)
1078         roots_map = {}
1079         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1080                                backoff_start=2)
1081         done = 0
1082         for tries_left in loop:
1083             try:
1084                 sorted_roots = self.map_new_services(
1085                     roots_map, locator,
1086                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1087             except Exception as error:
1088                 loop.save_result(error)
1089                 continue
1090
1091             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1092                                                         data_hash=data_hash,
1093                                                         copies=copies - done,
1094                                                         max_service_replicas=self.max_replicas_per_service,
1095                                                         timeout=self.current_timeout(num_retries - tries_left))
1096             for service_root, ks in [(root, roots_map[root])
1097                                      for root in sorted_roots]:
1098                 if ks.finished():
1099                     continue
1100                 writer_pool.add_task(ks, service_root)
1101             writer_pool.join()
1102             done += writer_pool.done()
1103             loop.save_result((done >= copies, writer_pool.total_task_nr))
1104
1105         if loop.success():
1106             return writer_pool.response()
1107         if not roots_map:
1108             raise arvados.errors.KeepWriteError(
1109                 "failed to write {}: no Keep services available ({})".format(
1110                     data_hash, loop.last_result()))
1111         else:
1112             service_errors = ((key, roots_map[key].last_result()['error'])
1113                               for key in sorted_roots
1114                               if roots_map[key].last_result()['error'])
1115             raise arvados.errors.KeepWriteError(
1116                 "failed to write {} (wanted {} copies but wrote {})".format(
1117                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1118
1119     def local_store_put(self, data, copies=1, num_retries=None):
1120         """A stub for put().
1121
1122         This method is used in place of the real put() method when
1123         using local storage (see constructor's local_store argument).
1124
1125         copies and num_retries arguments are ignored: they are here
1126         only for the sake of offering the same call signature as
1127         put().
1128
1129         Data stored this way can be retrieved via local_store_get().
1130         """
1131         md5 = hashlib.md5(data).hexdigest()
1132         locator = '%s+%d' % (md5, len(data))
1133         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1134             f.write(data)
1135         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1136                   os.path.join(self.local_store, md5))
1137         return locator
1138
1139     def local_store_get(self, loc_s, num_retries=None):
1140         """Companion to local_store_put()."""
1141         try:
1142             locator = KeepLocator(loc_s)
1143         except ValueError:
1144             raise arvados.errors.NotFoundError(
1145                 "Invalid data locator: '%s'" % loc_s)
1146         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1147             return ''
1148         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1149             return f.read()
1150
1151     def is_cached(self, locator):
1152         return self.block_cache.reserve_cache(expect_hash)