11349: Fix section name in example configs.
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import collections
3 import datetime
4 import hashlib
5 import logging
6 import math
7 import os
8 import pycurl
9 import Queue
10 import re
11 import socket
12 import ssl
13 import sys
14 import threading
15 import timer
16 import urlparse
17
18 import arvados
19 import arvados.config as config
20 import arvados.errors
21 import arvados.retry as retry
22 import arvados.util
23
24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
26
27
28 # Monkey patch TCP constants when not available (apple). Values sourced from:
29 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
30 if sys.platform == 'darwin':
31     if not hasattr(socket, 'TCP_KEEPALIVE'):
32         socket.TCP_KEEPALIVE = 0x010
33     if not hasattr(socket, 'TCP_KEEPINTVL'):
34         socket.TCP_KEEPINTVL = 0x101
35     if not hasattr(socket, 'TCP_KEEPCNT'):
36         socket.TCP_KEEPCNT = 0x102
37
38
39 class KeepLocator(object):
40     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
41     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
42
43     def __init__(self, locator_str):
44         self.hints = []
45         self._perm_sig = None
46         self._perm_expiry = None
47         pieces = iter(locator_str.split('+'))
48         self.md5sum = next(pieces)
49         try:
50             self.size = int(next(pieces))
51         except StopIteration:
52             self.size = None
53         for hint in pieces:
54             if self.HINT_RE.match(hint) is None:
55                 raise ValueError("invalid hint format: {}".format(hint))
56             elif hint.startswith('A'):
57                 self.parse_permission_hint(hint)
58             else:
59                 self.hints.append(hint)
60
61     def __str__(self):
62         return '+'.join(
63             str(s) for s in [self.md5sum, self.size,
64                              self.permission_hint()] + self.hints
65             if s is not None)
66
67     def stripped(self):
68         if self.size is not None:
69             return "%s+%i" % (self.md5sum, self.size)
70         else:
71             return self.md5sum
72
73     def _make_hex_prop(name, length):
74         # Build and return a new property with the given name that
75         # must be a hex string of the given length.
76         data_name = '_{}'.format(name)
77         def getter(self):
78             return getattr(self, data_name)
79         def setter(self, hex_str):
80             if not arvados.util.is_hex(hex_str, length):
81                 raise ValueError("{} is not a {}-digit hex string: {}".
82                                  format(name, length, hex_str))
83             setattr(self, data_name, hex_str)
84         return property(getter, setter)
85
86     md5sum = _make_hex_prop('md5sum', 32)
87     perm_sig = _make_hex_prop('perm_sig', 40)
88
89     @property
90     def perm_expiry(self):
91         return self._perm_expiry
92
93     @perm_expiry.setter
94     def perm_expiry(self, value):
95         if not arvados.util.is_hex(value, 1, 8):
96             raise ValueError(
97                 "permission timestamp must be a hex Unix timestamp: {}".
98                 format(value))
99         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
100
101     def permission_hint(self):
102         data = [self.perm_sig, self.perm_expiry]
103         if None in data:
104             return None
105         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
106         return "A{}@{:08x}".format(*data)
107
108     def parse_permission_hint(self, s):
109         try:
110             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
111         except IndexError:
112             raise ValueError("bad permission hint {}".format(s))
113
114     def permission_expired(self, as_of_dt=None):
115         if self.perm_expiry is None:
116             return False
117         elif as_of_dt is None:
118             as_of_dt = datetime.datetime.now()
119         return self.perm_expiry <= as_of_dt
120
121
122 class Keep(object):
123     """Simple interface to a global KeepClient object.
124
125     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
126     own API client.  The global KeepClient will build an API client from the
127     current Arvados configuration, which may not match the one you built.
128     """
129     _last_key = None
130
131     @classmethod
132     def global_client_object(cls):
133         global global_client_object
134         # Previously, KeepClient would change its behavior at runtime based
135         # on these configuration settings.  We simulate that behavior here
136         # by checking the values and returning a new KeepClient if any of
137         # them have changed.
138         key = (config.get('ARVADOS_API_HOST'),
139                config.get('ARVADOS_API_TOKEN'),
140                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
141                config.get('ARVADOS_KEEP_PROXY'),
142                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
143                os.environ.get('KEEP_LOCAL_STORE'))
144         if (global_client_object is None) or (cls._last_key != key):
145             global_client_object = KeepClient()
146             cls._last_key = key
147         return global_client_object
148
149     @staticmethod
150     def get(locator, **kwargs):
151         return Keep.global_client_object().get(locator, **kwargs)
152
153     @staticmethod
154     def put(data, **kwargs):
155         return Keep.global_client_object().put(data, **kwargs)
156
157 class KeepBlockCache(object):
158     # Default RAM cache is 256MiB
159     def __init__(self, cache_max=(256 * 1024 * 1024)):
160         self.cache_max = cache_max
161         self._cache = []
162         self._cache_lock = threading.Lock()
163
164     class CacheSlot(object):
165         __slots__ = ("locator", "ready", "content")
166
167         def __init__(self, locator):
168             self.locator = locator
169             self.ready = threading.Event()
170             self.content = None
171
172         def get(self):
173             self.ready.wait()
174             return self.content
175
176         def set(self, value):
177             self.content = value
178             self.ready.set()
179
180         def size(self):
181             if self.content is None:
182                 return 0
183             else:
184                 return len(self.content)
185
186     def cap_cache(self):
187         '''Cap the cache size to self.cache_max'''
188         with self._cache_lock:
189             # Select all slots except those where ready.is_set() and content is
190             # None (that means there was an error reading the block).
191             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
192             sm = sum([slot.size() for slot in self._cache])
193             while len(self._cache) > 0 and sm > self.cache_max:
194                 for i in xrange(len(self._cache)-1, -1, -1):
195                     if self._cache[i].ready.is_set():
196                         del self._cache[i]
197                         break
198                 sm = sum([slot.size() for slot in self._cache])
199
200     def _get(self, locator):
201         # Test if the locator is already in the cache
202         for i in xrange(0, len(self._cache)):
203             if self._cache[i].locator == locator:
204                 n = self._cache[i]
205                 if i != 0:
206                     # move it to the front
207                     del self._cache[i]
208                     self._cache.insert(0, n)
209                 return n
210         return None
211
212     def get(self, locator):
213         with self._cache_lock:
214             return self._get(locator)
215
216     def reserve_cache(self, locator):
217         '''Reserve a cache slot for the specified locator,
218         or return the existing slot.'''
219         with self._cache_lock:
220             n = self._get(locator)
221             if n:
222                 return n, False
223             else:
224                 # Add a new cache slot for the locator
225                 n = KeepBlockCache.CacheSlot(locator)
226                 self._cache.insert(0, n)
227                 return n, True
228
229 class Counter(object):
230     def __init__(self, v=0):
231         self._lk = threading.Lock()
232         self._val = v
233
234     def add(self, v):
235         with self._lk:
236             self._val += v
237
238     def get(self):
239         with self._lk:
240             return self._val
241
242
243 class KeepClient(object):
244
245     # Default Keep server connection timeout:  2 seconds
246     # Default Keep server read timeout:       256 seconds
247     # Default Keep server bandwidth minimum:  32768 bytes per second
248     # Default Keep proxy connection timeout:  20 seconds
249     # Default Keep proxy read timeout:        256 seconds
250     # Default Keep proxy bandwidth minimum:   32768 bytes per second
251     DEFAULT_TIMEOUT = (2, 256, 32768)
252     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
253
254
255     class KeepService(object):
256         """Make requests to a single Keep service, and track results.
257
258         A KeepService is intended to last long enough to perform one
259         transaction (GET or PUT) against one Keep service. This can
260         involve calling either get() or put() multiple times in order
261         to retry after transient failures. However, calling both get()
262         and put() on a single instance -- or using the same instance
263         to access two different Keep services -- will not produce
264         sensible behavior.
265         """
266
267         HTTP_ERRORS = (
268             socket.error,
269             ssl.SSLError,
270             arvados.errors.HttpError,
271         )
272
273         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
274                      upload_counter=None,
275                      download_counter=None, **headers):
276             self.root = root
277             self._user_agent_pool = user_agent_pool
278             self._result = {'error': None}
279             self._usable = True
280             self._session = None
281             self.get_headers = {'Accept': 'application/octet-stream'}
282             self.get_headers.update(headers)
283             self.put_headers = headers
284             self.upload_counter = upload_counter
285             self.download_counter = download_counter
286
287         def usable(self):
288             """Is it worth attempting a request?"""
289             return self._usable
290
291         def finished(self):
292             """Did the request succeed or encounter permanent failure?"""
293             return self._result['error'] == False or not self._usable
294
295         def last_result(self):
296             return self._result
297
298         def _get_user_agent(self):
299             try:
300                 return self._user_agent_pool.get(block=False)
301             except Queue.Empty:
302                 return pycurl.Curl()
303
304         def _put_user_agent(self, ua):
305             try:
306                 ua.reset()
307                 self._user_agent_pool.put(ua, block=False)
308             except:
309                 ua.close()
310
311         def _socket_open(self, *args, **kwargs):
312             if len(args) + len(kwargs) == 2:
313                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
314             else:
315                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
316
317         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
318             return self._socket_open_pycurl_7_21_5(
319                 purpose=None,
320                 address=collections.namedtuple(
321                     'Address', ['family', 'socktype', 'protocol', 'addr'],
322                 )(family, socktype, protocol, address))
323
324         def _socket_open_pycurl_7_21_5(self, purpose, address):
325             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
326             s = socket.socket(address.family, address.socktype, address.protocol)
327             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
328             # Will throw invalid protocol error on mac. This test prevents that.
329             if hasattr(socket, 'TCP_KEEPIDLE'):
330                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
331             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
332             return s
333
334         def get(self, locator, method="GET", timeout=None):
335             # locator is a KeepLocator object.
336             url = self.root + str(locator)
337             _logger.debug("Request: %s %s", method, url)
338             curl = self._get_user_agent()
339             ok = None
340             try:
341                 with timer.Timer() as t:
342                     self._headers = {}
343                     response_body = cStringIO.StringIO()
344                     curl.setopt(pycurl.NOSIGNAL, 1)
345                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
346                     curl.setopt(pycurl.URL, url.encode('utf-8'))
347                     curl.setopt(pycurl.HTTPHEADER, [
348                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
349                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
350                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
351                     if method == "HEAD":
352                         curl.setopt(pycurl.NOBODY, True)
353                     self._setcurltimeouts(curl, timeout)
354
355                     try:
356                         curl.perform()
357                     except Exception as e:
358                         raise arvados.errors.HttpError(0, str(e))
359                     self._result = {
360                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
361                         'body': response_body.getvalue(),
362                         'headers': self._headers,
363                         'error': False,
364                     }
365
366                 ok = retry.check_http_response_success(self._result['status_code'])
367                 if not ok:
368                     self._result['error'] = arvados.errors.HttpError(
369                         self._result['status_code'],
370                         self._headers.get('x-status-line', 'Error'))
371             except self.HTTP_ERRORS as e:
372                 self._result = {
373                     'error': e,
374                 }
375             self._usable = ok != False
376             if self._result.get('status_code', None):
377                 # The client worked well enough to get an HTTP status
378                 # code, so presumably any problems are just on the
379                 # server side and it's OK to reuse the client.
380                 self._put_user_agent(curl)
381             else:
382                 # Don't return this client to the pool, in case it's
383                 # broken.
384                 curl.close()
385             if not ok:
386                 _logger.debug("Request fail: GET %s => %s: %s",
387                               url, type(self._result['error']), str(self._result['error']))
388                 return None
389             if method == "HEAD":
390                 _logger.info("HEAD %s: %s bytes",
391                          self._result['status_code'],
392                          self._result.get('content-length'))
393                 return True
394
395             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
396                          self._result['status_code'],
397                          len(self._result['body']),
398                          t.msecs,
399                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
400
401             if self.download_counter:
402                 self.download_counter.add(len(self._result['body']))
403             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
404             if resp_md5 != locator.md5sum:
405                 _logger.warning("Checksum fail: md5(%s) = %s",
406                                 url, resp_md5)
407                 self._result['error'] = arvados.errors.HttpError(
408                     0, 'Checksum fail')
409                 return None
410             return self._result['body']
411
412         def put(self, hash_s, body, timeout=None):
413             url = self.root + hash_s
414             _logger.debug("Request: PUT %s", url)
415             curl = self._get_user_agent()
416             ok = None
417             try:
418                 with timer.Timer() as t:
419                     self._headers = {}
420                     body_reader = cStringIO.StringIO(body)
421                     response_body = cStringIO.StringIO()
422                     curl.setopt(pycurl.NOSIGNAL, 1)
423                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
424                     curl.setopt(pycurl.URL, url.encode('utf-8'))
425                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
426                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
427                     # response) instead of sending the request body immediately.
428                     # This allows the server to reject the request if the request
429                     # is invalid or the server is read-only, without waiting for
430                     # the client to send the entire block.
431                     curl.setopt(pycurl.UPLOAD, True)
432                     curl.setopt(pycurl.INFILESIZE, len(body))
433                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
434                     curl.setopt(pycurl.HTTPHEADER, [
435                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
436                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
437                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
438                     self._setcurltimeouts(curl, timeout)
439                     try:
440                         curl.perform()
441                     except Exception as e:
442                         raise arvados.errors.HttpError(0, str(e))
443                     self._result = {
444                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
445                         'body': response_body.getvalue(),
446                         'headers': self._headers,
447                         'error': False,
448                     }
449                 ok = retry.check_http_response_success(self._result['status_code'])
450                 if not ok:
451                     self._result['error'] = arvados.errors.HttpError(
452                         self._result['status_code'],
453                         self._headers.get('x-status-line', 'Error'))
454             except self.HTTP_ERRORS as e:
455                 self._result = {
456                     'error': e,
457                 }
458             self._usable = ok != False # still usable if ok is True or None
459             if self._result.get('status_code', None):
460                 # Client is functional. See comment in get().
461                 self._put_user_agent(curl)
462             else:
463                 curl.close()
464             if not ok:
465                 _logger.debug("Request fail: PUT %s => %s: %s",
466                               url, type(self._result['error']), str(self._result['error']))
467                 return False
468             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
469                          self._result['status_code'],
470                          len(body),
471                          t.msecs,
472                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
473             if self.upload_counter:
474                 self.upload_counter.add(len(body))
475             return True
476
477         def _setcurltimeouts(self, curl, timeouts):
478             if not timeouts:
479                 return
480             elif isinstance(timeouts, tuple):
481                 if len(timeouts) == 2:
482                     conn_t, xfer_t = timeouts
483                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
484                 else:
485                     conn_t, xfer_t, bandwidth_bps = timeouts
486             else:
487                 conn_t, xfer_t = (timeouts, timeouts)
488                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
489             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
490             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
491             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
492
493         def _headerfunction(self, header_line):
494             header_line = header_line.decode('iso-8859-1')
495             if ':' in header_line:
496                 name, value = header_line.split(':', 1)
497                 name = name.strip().lower()
498                 value = value.strip()
499             elif self._headers:
500                 name = self._lastheadername
501                 value = self._headers[name] + ' ' + header_line.strip()
502             elif header_line.startswith('HTTP/'):
503                 name = 'x-status-line'
504                 value = header_line
505             else:
506                 _logger.error("Unexpected header line: %s", header_line)
507                 return
508             self._lastheadername = name
509             self._headers[name] = value
510             # Returning None implies all bytes were written
511     
512
513     class KeepWriterQueue(Queue.Queue):
514         def __init__(self, copies):
515             Queue.Queue.__init__(self) # Old-style superclass
516             self.wanted_copies = copies
517             self.successful_copies = 0
518             self.response = None
519             self.successful_copies_lock = threading.Lock()
520             self.pending_tries = copies
521             self.pending_tries_notification = threading.Condition()
522         
523         def write_success(self, response, replicas_nr):
524             with self.successful_copies_lock:
525                 self.successful_copies += replicas_nr
526                 self.response = response
527             with self.pending_tries_notification:
528                 self.pending_tries_notification.notify_all()
529         
530         def write_fail(self, ks):
531             with self.pending_tries_notification:
532                 self.pending_tries += 1
533                 self.pending_tries_notification.notify()
534         
535         def pending_copies(self):
536             with self.successful_copies_lock:
537                 return self.wanted_copies - self.successful_copies
538
539         def get_next_task(self):
540             with self.pending_tries_notification:
541                 while True:
542                     if self.pending_copies() < 1:
543                         # This notify_all() is unnecessary --
544                         # write_success() already called notify_all()
545                         # when pending<1 became true, so it's not
546                         # possible for any other thread to be in
547                         # wait() now -- but it's cheap insurance
548                         # against deadlock so we do it anyway:
549                         self.pending_tries_notification.notify_all()
550                         # Drain the queue and then raise Queue.Empty
551                         while True:
552                             self.get_nowait()
553                             self.task_done()
554                     elif self.pending_tries > 0:
555                         service, service_root = self.get_nowait()
556                         if service.finished():
557                             self.task_done()
558                             continue
559                         self.pending_tries -= 1
560                         return service, service_root
561                     elif self.empty():
562                         self.pending_tries_notification.notify_all()
563                         raise Queue.Empty
564                     else:
565                         self.pending_tries_notification.wait()
566
567
568     class KeepWriterThreadPool(object):
569         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
570             self.total_task_nr = 0
571             self.wanted_copies = copies
572             if (not max_service_replicas) or (max_service_replicas >= copies):
573                 num_threads = 1
574             else:
575                 num_threads = int(math.ceil(float(copies) / max_service_replicas))
576             _logger.debug("Pool max threads is %d", num_threads)
577             self.workers = []
578             self.queue = KeepClient.KeepWriterQueue(copies)
579             # Create workers
580             for _ in range(num_threads):
581                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
582                 self.workers.append(w)
583         
584         def add_task(self, ks, service_root):
585             self.queue.put((ks, service_root))
586             self.total_task_nr += 1
587         
588         def done(self):
589             return self.queue.successful_copies
590         
591         def join(self):
592             # Start workers
593             for worker in self.workers:
594                 worker.start()
595             # Wait for finished work
596             self.queue.join()
597         
598         def response(self):
599             return self.queue.response
600     
601     
602     class KeepWriterThread(threading.Thread):
603         TaskFailed = RuntimeError()
604
605         def __init__(self, queue, data, data_hash, timeout=None):
606             super(KeepClient.KeepWriterThread, self).__init__()
607             self.timeout = timeout
608             self.queue = queue
609             self.data = data
610             self.data_hash = data_hash
611             self.daemon = True
612
613         def run(self):
614             while True:
615                 try:
616                     service, service_root = self.queue.get_next_task()
617                 except Queue.Empty:
618                     return
619                 try:
620                     locator, copies = self.do_task(service, service_root)
621                 except Exception as e:
622                     if e is not self.TaskFailed:
623                         _logger.exception("Exception in KeepWriterThread")
624                     self.queue.write_fail(service)
625                 else:
626                     self.queue.write_success(locator, copies)
627                 finally:
628                     self.queue.task_done()
629
630         def do_task(self, service, service_root):
631             success = bool(service.put(self.data_hash,
632                                         self.data,
633                                         timeout=self.timeout))
634             result = service.last_result()
635
636             if not success:
637                 if result.get('status_code', None):
638                     _logger.debug("Request fail: PUT %s => %s %s",
639                                   self.data_hash,
640                                   result['status_code'],
641                                   result['body'])
642                 raise self.TaskFailed
643
644             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
645                           str(threading.current_thread()),
646                           self.data_hash,
647                           len(self.data),
648                           service_root)
649             try:
650                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
651             except (KeyError, ValueError):
652                 replicas_stored = 1
653
654             return result['body'].strip(), replicas_stored
655
656
657     def __init__(self, api_client=None, proxy=None,
658                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
659                  api_token=None, local_store=None, block_cache=None,
660                  num_retries=0, session=None):
661         """Initialize a new KeepClient.
662
663         Arguments:
664         :api_client:
665           The API client to use to find Keep services.  If not
666           provided, KeepClient will build one from available Arvados
667           configuration.
668
669         :proxy:
670           If specified, this KeepClient will send requests to this Keep
671           proxy.  Otherwise, KeepClient will fall back to the setting of the
672           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
673           If you want to KeepClient does not use a proxy, pass in an empty
674           string.
675
676         :timeout:
677           The initial timeout (in seconds) for HTTP requests to Keep
678           non-proxy servers.  A tuple of three floats is interpreted as
679           (connection_timeout, read_timeout, minimum_bandwidth). A connection
680           will be aborted if the average traffic rate falls below
681           minimum_bandwidth bytes per second over an interval of read_timeout
682           seconds. Because timeouts are often a result of transient server
683           load, the actual connection timeout will be increased by a factor
684           of two on each retry.
685           Default: (2, 256, 32768).
686
687         :proxy_timeout:
688           The initial timeout (in seconds) for HTTP requests to
689           Keep proxies. A tuple of three floats is interpreted as
690           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
691           described above for adjusting connection timeouts on retry also
692           applies.
693           Default: (20, 256, 32768).
694
695         :api_token:
696           If you're not using an API client, but only talking
697           directly to a Keep proxy, this parameter specifies an API token
698           to authenticate Keep requests.  It is an error to specify both
699           api_client and api_token.  If you specify neither, KeepClient
700           will use one available from the Arvados configuration.
701
702         :local_store:
703           If specified, this KeepClient will bypass Keep
704           services, and save data to the named directory.  If unspecified,
705           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
706           environment variable.  If you want to ensure KeepClient does not
707           use local storage, pass in an empty string.  This is primarily
708           intended to mock a server for testing.
709
710         :num_retries:
711           The default number of times to retry failed requests.
712           This will be used as the default num_retries value when get() and
713           put() are called.  Default 0.
714         """
715         self.lock = threading.Lock()
716         if proxy is None:
717             if config.get('ARVADOS_KEEP_SERVICES'):
718                 proxy = config.get('ARVADOS_KEEP_SERVICES')
719             else:
720                 proxy = config.get('ARVADOS_KEEP_PROXY')
721         if api_token is None:
722             if api_client is None:
723                 api_token = config.get('ARVADOS_API_TOKEN')
724             else:
725                 api_token = api_client.api_token
726         elif api_client is not None:
727             raise ValueError(
728                 "can't build KeepClient with both API client and token")
729         if local_store is None:
730             local_store = os.environ.get('KEEP_LOCAL_STORE')
731
732         self.block_cache = block_cache if block_cache else KeepBlockCache()
733         self.timeout = timeout
734         self.proxy_timeout = proxy_timeout
735         self._user_agent_pool = Queue.LifoQueue()
736         self.upload_counter = Counter()
737         self.download_counter = Counter()
738         self.put_counter = Counter()
739         self.get_counter = Counter()
740         self.hits_counter = Counter()
741         self.misses_counter = Counter()
742
743         if local_store:
744             self.local_store = local_store
745             self.get = self.local_store_get
746             self.put = self.local_store_put
747         else:
748             self.num_retries = num_retries
749             self.max_replicas_per_service = None
750             if proxy:
751                 proxy_uris = proxy.split()
752                 for i in range(len(proxy_uris)):
753                     if not proxy_uris[i].endswith('/'):
754                         proxy_uris[i] += '/'
755                     # URL validation
756                     url = urlparse.urlparse(proxy_uris[i])
757                     if not (url.scheme and url.netloc):
758                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
759                 self.api_token = api_token
760                 self._gateway_services = {}
761                 self._keep_services = [{
762                     'uuid': "00000-bi6l4-%015d" % idx,
763                     'service_type': 'proxy',
764                     '_service_root': uri,
765                     } for idx, uri in enumerate(proxy_uris)]
766                 self._writable_services = self._keep_services
767                 self.using_proxy = True
768                 self._static_services_list = True
769             else:
770                 # It's important to avoid instantiating an API client
771                 # unless we actually need one, for testing's sake.
772                 if api_client is None:
773                     api_client = arvados.api('v1')
774                 self.api_client = api_client
775                 self.api_token = api_client.api_token
776                 self._gateway_services = {}
777                 self._keep_services = None
778                 self._writable_services = None
779                 self.using_proxy = None
780                 self._static_services_list = False
781
782     def current_timeout(self, attempt_number):
783         """Return the appropriate timeout to use for this client.
784
785         The proxy timeout setting if the backend service is currently a proxy,
786         the regular timeout setting otherwise.  The `attempt_number` indicates
787         how many times the operation has been tried already (starting from 0
788         for the first try), and scales the connection timeout portion of the
789         return value accordingly.
790
791         """
792         # TODO(twp): the timeout should be a property of a
793         # KeepService, not a KeepClient. See #4488.
794         t = self.proxy_timeout if self.using_proxy else self.timeout
795         if len(t) == 2:
796             return (t[0] * (1 << attempt_number), t[1])
797         else:
798             return (t[0] * (1 << attempt_number), t[1], t[2])
799     def _any_nondisk_services(self, service_list):
800         return any(ks.get('service_type', 'disk') != 'disk'
801                    for ks in service_list)
802
803     def build_services_list(self, force_rebuild=False):
804         if (self._static_services_list or
805               (self._keep_services and not force_rebuild)):
806             return
807         with self.lock:
808             try:
809                 keep_services = self.api_client.keep_services().accessible()
810             except Exception:  # API server predates Keep services.
811                 keep_services = self.api_client.keep_disks().list()
812
813             # Gateway services are only used when specified by UUID,
814             # so there's nothing to gain by filtering them by
815             # service_type.
816             self._gateway_services = {ks['uuid']: ks for ks in
817                                       keep_services.execute()['items']}
818             if not self._gateway_services:
819                 raise arvados.errors.NoKeepServersError()
820
821             # Precompute the base URI for each service.
822             for r in self._gateway_services.itervalues():
823                 host = r['service_host']
824                 if not host.startswith('[') and host.find(':') >= 0:
825                     # IPv6 URIs must be formatted like http://[::1]:80/...
826                     host = '[' + host + ']'
827                 r['_service_root'] = "{}://{}:{:d}/".format(
828                     'https' if r['service_ssl_flag'] else 'http',
829                     host,
830                     r['service_port'])
831
832             _logger.debug(str(self._gateway_services))
833             self._keep_services = [
834                 ks for ks in self._gateway_services.itervalues()
835                 if not ks.get('service_type', '').startswith('gateway:')]
836             self._writable_services = [ks for ks in self._keep_services
837                                        if not ks.get('read_only')]
838
839             # For disk type services, max_replicas_per_service is 1
840             # It is unknown (unlimited) for other service types.
841             if self._any_nondisk_services(self._writable_services):
842                 self.max_replicas_per_service = None
843             else:
844                 self.max_replicas_per_service = 1
845
846     def _service_weight(self, data_hash, service_uuid):
847         """Compute the weight of a Keep service endpoint for a data
848         block with a known hash.
849
850         The weight is md5(h + u) where u is the last 15 characters of
851         the service endpoint's UUID.
852         """
853         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
854
855     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
856         """Return an array of Keep service endpoints, in the order in
857         which they should be probed when reading or writing data with
858         the given hash+hints.
859         """
860         self.build_services_list(force_rebuild)
861
862         sorted_roots = []
863         # Use the services indicated by the given +K@... remote
864         # service hints, if any are present and can be resolved to a
865         # URI.
866         for hint in locator.hints:
867             if hint.startswith('K@'):
868                 if len(hint) == 7:
869                     sorted_roots.append(
870                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
871                 elif len(hint) == 29:
872                     svc = self._gateway_services.get(hint[2:])
873                     if svc:
874                         sorted_roots.append(svc['_service_root'])
875
876         # Sort the available local services by weight (heaviest first)
877         # for this locator, and return their service_roots (base URIs)
878         # in that order.
879         use_services = self._keep_services
880         if need_writable:
881             use_services = self._writable_services
882         self.using_proxy = self._any_nondisk_services(use_services)
883         sorted_roots.extend([
884             svc['_service_root'] for svc in sorted(
885                 use_services,
886                 reverse=True,
887                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
888         _logger.debug("{}: {}".format(locator, sorted_roots))
889         return sorted_roots
890
891     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
892         # roots_map is a dictionary, mapping Keep service root strings
893         # to KeepService objects.  Poll for Keep services, and add any
894         # new ones to roots_map.  Return the current list of local
895         # root strings.
896         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
897         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
898         for root in local_roots:
899             if root not in roots_map:
900                 roots_map[root] = self.KeepService(
901                     root, self._user_agent_pool,
902                     upload_counter=self.upload_counter,
903                     download_counter=self.download_counter,
904                     **headers)
905         return local_roots
906
907     @staticmethod
908     def _check_loop_result(result):
909         # KeepClient RetryLoops should save results as a 2-tuple: the
910         # actual result of the request, and the number of servers available
911         # to receive the request this round.
912         # This method returns True if there's a real result, False if
913         # there are no more servers available, otherwise None.
914         if isinstance(result, Exception):
915             return None
916         result, tried_server_count = result
917         if (result is not None) and (result is not False):
918             return True
919         elif tried_server_count < 1:
920             _logger.info("No more Keep services to try; giving up")
921             return False
922         else:
923             return None
924
925     def get_from_cache(self, loc):
926         """Fetch a block only if is in the cache, otherwise return None."""
927         slot = self.block_cache.get(loc)
928         if slot is not None and slot.ready.is_set():
929             return slot.get()
930         else:
931             return None
932
933     @retry.retry_method
934     def head(self, loc_s, num_retries=None):
935         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
936
937     @retry.retry_method
938     def get(self, loc_s, num_retries=None):
939         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
940
941     def _get_or_head(self, loc_s, method="GET", num_retries=None):
942         """Get data from Keep.
943
944         This method fetches one or more blocks of data from Keep.  It
945         sends a request each Keep service registered with the API
946         server (or the proxy provided when this client was
947         instantiated), then each service named in location hints, in
948         sequence.  As soon as one service provides the data, it's
949         returned.
950
951         Arguments:
952         * loc_s: A string of one or more comma-separated locators to fetch.
953           This method returns the concatenation of these blocks.
954         * num_retries: The number of times to retry GET requests to
955           *each* Keep server if it returns temporary failures, with
956           exponential backoff.  Note that, in each loop, the method may try
957           to fetch data from every available Keep service, along with any
958           that are named in location hints in the locator.  The default value
959           is set when the KeepClient is initialized.
960         """
961         if ',' in loc_s:
962             return ''.join(self.get(x) for x in loc_s.split(','))
963
964         self.get_counter.add(1)
965
966         locator = KeepLocator(loc_s)
967         if method == "GET":
968             slot, first = self.block_cache.reserve_cache(locator.md5sum)
969             if not first:
970                 self.hits_counter.add(1)
971                 v = slot.get()
972                 return v
973
974         self.misses_counter.add(1)
975
976         # If the locator has hints specifying a prefix (indicating a
977         # remote keepproxy) or the UUID of a local gateway service,
978         # read data from the indicated service(s) instead of the usual
979         # list of local disk services.
980         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
981                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
982         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
983                            for hint in locator.hints if (
984                                    hint.startswith('K@') and
985                                    len(hint) == 29 and
986                                    self._gateway_services.get(hint[2:])
987                                    )])
988         # Map root URLs to their KeepService objects.
989         roots_map = {
990             root: self.KeepService(root, self._user_agent_pool,
991                                    upload_counter=self.upload_counter,
992                                    download_counter=self.download_counter)
993             for root in hint_roots
994         }
995
996         # See #3147 for a discussion of the loop implementation.  Highlights:
997         # * Refresh the list of Keep services after each failure, in case
998         #   it's being updated.
999         # * Retry until we succeed, we're out of retries, or every available
1000         #   service has returned permanent failure.
1001         sorted_roots = []
1002         roots_map = {}
1003         blob = None
1004         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1005                                backoff_start=2)
1006         for tries_left in loop:
1007             try:
1008                 sorted_roots = self.map_new_services(
1009                     roots_map, locator,
1010                     force_rebuild=(tries_left < num_retries),
1011                     need_writable=False)
1012             except Exception as error:
1013                 loop.save_result(error)
1014                 continue
1015
1016             # Query KeepService objects that haven't returned
1017             # permanent failure, in our specified shuffle order.
1018             services_to_try = [roots_map[root]
1019                                for root in sorted_roots
1020                                if roots_map[root].usable()]
1021             for keep_service in services_to_try:
1022                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1023                 if blob is not None:
1024                     break
1025             loop.save_result((blob, len(services_to_try)))
1026
1027         # Always cache the result, then return it if we succeeded.
1028         if method == "GET":
1029             slot.set(blob)
1030             self.block_cache.cap_cache()
1031         if loop.success():
1032             if method == "HEAD":
1033                 return True
1034             else:
1035                 return blob
1036
1037         # Q: Including 403 is necessary for the Keep tests to continue
1038         # passing, but maybe they should expect KeepReadError instead?
1039         not_founds = sum(1 for key in sorted_roots
1040                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1041         service_errors = ((key, roots_map[key].last_result()['error'])
1042                           for key in sorted_roots)
1043         if not roots_map:
1044             raise arvados.errors.KeepReadError(
1045                 "failed to read {}: no Keep services available ({})".format(
1046                     loc_s, loop.last_result()))
1047         elif not_founds == len(sorted_roots):
1048             raise arvados.errors.NotFoundError(
1049                 "{} not found".format(loc_s), service_errors)
1050         else:
1051             raise arvados.errors.KeepReadError(
1052                 "failed to read {}".format(loc_s), service_errors, label="service")
1053
1054     @retry.retry_method
1055     def put(self, data, copies=2, num_retries=None):
1056         """Save data in Keep.
1057
1058         This method will get a list of Keep services from the API server, and
1059         send the data to each one simultaneously in a new thread.  Once the
1060         uploads are finished, if enough copies are saved, this method returns
1061         the most recent HTTP response body.  If requests fail to upload
1062         enough copies, this method raises KeepWriteError.
1063
1064         Arguments:
1065         * data: The string of data to upload.
1066         * copies: The number of copies that the user requires be saved.
1067           Default 2.
1068         * num_retries: The number of times to retry PUT requests to
1069           *each* Keep server if it returns temporary failures, with
1070           exponential backoff.  The default value is set when the
1071           KeepClient is initialized.
1072         """
1073
1074         if isinstance(data, unicode):
1075             data = data.encode("ascii")
1076         elif not isinstance(data, str):
1077             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1078
1079         self.put_counter.add(1)
1080
1081         data_hash = hashlib.md5(data).hexdigest()
1082         loc_s = data_hash + '+' + str(len(data))
1083         if copies < 1:
1084             return loc_s
1085         locator = KeepLocator(loc_s)
1086
1087         headers = {}
1088         # Tell the proxy how many copies we want it to store
1089         headers['X-Keep-Desired-Replicas'] = str(copies)
1090         roots_map = {}
1091         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1092                                backoff_start=2)
1093         done = 0
1094         for tries_left in loop:
1095             try:
1096                 sorted_roots = self.map_new_services(
1097                     roots_map, locator,
1098                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1099             except Exception as error:
1100                 loop.save_result(error)
1101                 continue
1102
1103             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1104                                                         data_hash=data_hash,
1105                                                         copies=copies - done,
1106                                                         max_service_replicas=self.max_replicas_per_service,
1107                                                         timeout=self.current_timeout(num_retries - tries_left))
1108             for service_root, ks in [(root, roots_map[root])
1109                                      for root in sorted_roots]:
1110                 if ks.finished():
1111                     continue
1112                 writer_pool.add_task(ks, service_root)
1113             writer_pool.join()
1114             done += writer_pool.done()
1115             loop.save_result((done >= copies, writer_pool.total_task_nr))
1116
1117         if loop.success():
1118             return writer_pool.response()
1119         if not roots_map:
1120             raise arvados.errors.KeepWriteError(
1121                 "failed to write {}: no Keep services available ({})".format(
1122                     data_hash, loop.last_result()))
1123         else:
1124             service_errors = ((key, roots_map[key].last_result()['error'])
1125                               for key in sorted_roots
1126                               if roots_map[key].last_result()['error'])
1127             raise arvados.errors.KeepWriteError(
1128                 "failed to write {} (wanted {} copies but wrote {})".format(
1129                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1130
1131     def local_store_put(self, data, copies=1, num_retries=None):
1132         """A stub for put().
1133
1134         This method is used in place of the real put() method when
1135         using local storage (see constructor's local_store argument).
1136
1137         copies and num_retries arguments are ignored: they are here
1138         only for the sake of offering the same call signature as
1139         put().
1140
1141         Data stored this way can be retrieved via local_store_get().
1142         """
1143         md5 = hashlib.md5(data).hexdigest()
1144         locator = '%s+%d' % (md5, len(data))
1145         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1146             f.write(data)
1147         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1148                   os.path.join(self.local_store, md5))
1149         return locator
1150
1151     def local_store_get(self, loc_s, num_retries=None):
1152         """Companion to local_store_put()."""
1153         try:
1154             locator = KeepLocator(loc_s)
1155         except ValueError:
1156             raise arvados.errors.NotFoundError(
1157                 "Invalid data locator: '%s'" % loc_s)
1158         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1159             return ''
1160         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1161             return f.read()
1162
1163     def is_cached(self, locator):
1164         return self.block_cache.reserve_cache(expect_hash)