Merge branch '10467-client-disconnect' refs #10467
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import sys
13 import threading
14 import timer
15 import urlparse
16
17 import arvados
18 import arvados.config as config
19 import arvados.errors
20 import arvados.retry as retry
21 import arvados.util
22
23 _logger = logging.getLogger('arvados.keep')
24 global_client_object = None
25
26
27 # Monkey patch TCP constants when not available (apple). Values sourced from:
28 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
29 if sys.platform == 'darwin':
30     if not hasattr(socket, 'TCP_KEEPALIVE'):
31         socket.TCP_KEEPALIVE = 0x010
32     if not hasattr(socket, 'TCP_KEEPINTVL'):
33         socket.TCP_KEEPINTVL = 0x101
34     if not hasattr(socket, 'TCP_KEEPCNT'):
35         socket.TCP_KEEPCNT = 0x102
36
37
38 class KeepLocator(object):
39     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
40     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
41
42     def __init__(self, locator_str):
43         self.hints = []
44         self._perm_sig = None
45         self._perm_expiry = None
46         pieces = iter(locator_str.split('+'))
47         self.md5sum = next(pieces)
48         try:
49             self.size = int(next(pieces))
50         except StopIteration:
51             self.size = None
52         for hint in pieces:
53             if self.HINT_RE.match(hint) is None:
54                 raise ValueError("invalid hint format: {}".format(hint))
55             elif hint.startswith('A'):
56                 self.parse_permission_hint(hint)
57             else:
58                 self.hints.append(hint)
59
60     def __str__(self):
61         return '+'.join(
62             str(s) for s in [self.md5sum, self.size,
63                              self.permission_hint()] + self.hints
64             if s is not None)
65
66     def stripped(self):
67         if self.size is not None:
68             return "%s+%i" % (self.md5sum, self.size)
69         else:
70             return self.md5sum
71
72     def _make_hex_prop(name, length):
73         # Build and return a new property with the given name that
74         # must be a hex string of the given length.
75         data_name = '_{}'.format(name)
76         def getter(self):
77             return getattr(self, data_name)
78         def setter(self, hex_str):
79             if not arvados.util.is_hex(hex_str, length):
80                 raise ValueError("{} is not a {}-digit hex string: {}".
81                                  format(name, length, hex_str))
82             setattr(self, data_name, hex_str)
83         return property(getter, setter)
84
85     md5sum = _make_hex_prop('md5sum', 32)
86     perm_sig = _make_hex_prop('perm_sig', 40)
87
88     @property
89     def perm_expiry(self):
90         return self._perm_expiry
91
92     @perm_expiry.setter
93     def perm_expiry(self, value):
94         if not arvados.util.is_hex(value, 1, 8):
95             raise ValueError(
96                 "permission timestamp must be a hex Unix timestamp: {}".
97                 format(value))
98         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
99
100     def permission_hint(self):
101         data = [self.perm_sig, self.perm_expiry]
102         if None in data:
103             return None
104         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
105         return "A{}@{:08x}".format(*data)
106
107     def parse_permission_hint(self, s):
108         try:
109             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
110         except IndexError:
111             raise ValueError("bad permission hint {}".format(s))
112
113     def permission_expired(self, as_of_dt=None):
114         if self.perm_expiry is None:
115             return False
116         elif as_of_dt is None:
117             as_of_dt = datetime.datetime.now()
118         return self.perm_expiry <= as_of_dt
119
120
121 class Keep(object):
122     """Simple interface to a global KeepClient object.
123
124     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
125     own API client.  The global KeepClient will build an API client from the
126     current Arvados configuration, which may not match the one you built.
127     """
128     _last_key = None
129
130     @classmethod
131     def global_client_object(cls):
132         global global_client_object
133         # Previously, KeepClient would change its behavior at runtime based
134         # on these configuration settings.  We simulate that behavior here
135         # by checking the values and returning a new KeepClient if any of
136         # them have changed.
137         key = (config.get('ARVADOS_API_HOST'),
138                config.get('ARVADOS_API_TOKEN'),
139                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
140                config.get('ARVADOS_KEEP_PROXY'),
141                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
142                os.environ.get('KEEP_LOCAL_STORE'))
143         if (global_client_object is None) or (cls._last_key != key):
144             global_client_object = KeepClient()
145             cls._last_key = key
146         return global_client_object
147
148     @staticmethod
149     def get(locator, **kwargs):
150         return Keep.global_client_object().get(locator, **kwargs)
151
152     @staticmethod
153     def put(data, **kwargs):
154         return Keep.global_client_object().put(data, **kwargs)
155
156 class KeepBlockCache(object):
157     # Default RAM cache is 256MiB
158     def __init__(self, cache_max=(256 * 1024 * 1024)):
159         self.cache_max = cache_max
160         self._cache = []
161         self._cache_lock = threading.Lock()
162
163     class CacheSlot(object):
164         __slots__ = ("locator", "ready", "content")
165
166         def __init__(self, locator):
167             self.locator = locator
168             self.ready = threading.Event()
169             self.content = None
170
171         def get(self):
172             self.ready.wait()
173             return self.content
174
175         def set(self, value):
176             self.content = value
177             self.ready.set()
178
179         def size(self):
180             if self.content is None:
181                 return 0
182             else:
183                 return len(self.content)
184
185     def cap_cache(self):
186         '''Cap the cache size to self.cache_max'''
187         with self._cache_lock:
188             # Select all slots except those where ready.is_set() and content is
189             # None (that means there was an error reading the block).
190             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
191             sm = sum([slot.size() for slot in self._cache])
192             while len(self._cache) > 0 and sm > self.cache_max:
193                 for i in xrange(len(self._cache)-1, -1, -1):
194                     if self._cache[i].ready.is_set():
195                         del self._cache[i]
196                         break
197                 sm = sum([slot.size() for slot in self._cache])
198
199     def _get(self, locator):
200         # Test if the locator is already in the cache
201         for i in xrange(0, len(self._cache)):
202             if self._cache[i].locator == locator:
203                 n = self._cache[i]
204                 if i != 0:
205                     # move it to the front
206                     del self._cache[i]
207                     self._cache.insert(0, n)
208                 return n
209         return None
210
211     def get(self, locator):
212         with self._cache_lock:
213             return self._get(locator)
214
215     def reserve_cache(self, locator):
216         '''Reserve a cache slot for the specified locator,
217         or return the existing slot.'''
218         with self._cache_lock:
219             n = self._get(locator)
220             if n:
221                 return n, False
222             else:
223                 # Add a new cache slot for the locator
224                 n = KeepBlockCache.CacheSlot(locator)
225                 self._cache.insert(0, n)
226                 return n, True
227
228 class Counter(object):
229     def __init__(self, v=0):
230         self._lk = threading.Lock()
231         self._val = v
232
233     def add(self, v):
234         with self._lk:
235             self._val += v
236
237     def get(self):
238         with self._lk:
239             return self._val
240
241
242 class KeepClient(object):
243
244     # Default Keep server connection timeout:  2 seconds
245     # Default Keep server read timeout:       256 seconds
246     # Default Keep server bandwidth minimum:  32768 bytes per second
247     # Default Keep proxy connection timeout:  20 seconds
248     # Default Keep proxy read timeout:        256 seconds
249     # Default Keep proxy bandwidth minimum:   32768 bytes per second
250     DEFAULT_TIMEOUT = (2, 256, 32768)
251     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
252
253
254     class KeepService(object):
255         """Make requests to a single Keep service, and track results.
256
257         A KeepService is intended to last long enough to perform one
258         transaction (GET or PUT) against one Keep service. This can
259         involve calling either get() or put() multiple times in order
260         to retry after transient failures. However, calling both get()
261         and put() on a single instance -- or using the same instance
262         to access two different Keep services -- will not produce
263         sensible behavior.
264         """
265
266         HTTP_ERRORS = (
267             socket.error,
268             ssl.SSLError,
269             arvados.errors.HttpError,
270         )
271
272         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
273                      upload_counter=None,
274                      download_counter=None, **headers):
275             self.root = root
276             self._user_agent_pool = user_agent_pool
277             self._result = {'error': None}
278             self._usable = True
279             self._session = None
280             self.get_headers = {'Accept': 'application/octet-stream'}
281             self.get_headers.update(headers)
282             self.put_headers = headers
283             self.upload_counter = upload_counter
284             self.download_counter = download_counter
285
286         def usable(self):
287             """Is it worth attempting a request?"""
288             return self._usable
289
290         def finished(self):
291             """Did the request succeed or encounter permanent failure?"""
292             return self._result['error'] == False or not self._usable
293
294         def last_result(self):
295             return self._result
296
297         def _get_user_agent(self):
298             try:
299                 return self._user_agent_pool.get(False)
300             except Queue.Empty:
301                 return pycurl.Curl()
302
303         def _put_user_agent(self, ua):
304             try:
305                 ua.reset()
306                 self._user_agent_pool.put(ua, False)
307             except:
308                 ua.close()
309
310         @staticmethod
311         def _socket_open(family, socktype, protocol, address=None):
312             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
313             s = socket.socket(family, socktype, protocol)
314             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
315             # Will throw invalid protocol error on mac. This test prevents that.
316             if hasattr(socket, 'TCP_KEEPIDLE'):
317                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
318             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
319             return s
320
321         def get(self, locator, method="GET", timeout=None):
322             # locator is a KeepLocator object.
323             url = self.root + str(locator)
324             _logger.debug("Request: %s %s", method, url)
325             curl = self._get_user_agent()
326             ok = None
327             try:
328                 with timer.Timer() as t:
329                     self._headers = {}
330                     response_body = cStringIO.StringIO()
331                     curl.setopt(pycurl.NOSIGNAL, 1)
332                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
333                     curl.setopt(pycurl.URL, url.encode('utf-8'))
334                     curl.setopt(pycurl.HTTPHEADER, [
335                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
336                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
337                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
338                     if method == "HEAD":
339                         curl.setopt(pycurl.NOBODY, True)
340                     self._setcurltimeouts(curl, timeout)
341
342                     try:
343                         curl.perform()
344                     except Exception as e:
345                         raise arvados.errors.HttpError(0, str(e))
346                     self._result = {
347                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
348                         'body': response_body.getvalue(),
349                         'headers': self._headers,
350                         'error': False,
351                     }
352
353                 ok = retry.check_http_response_success(self._result['status_code'])
354                 if not ok:
355                     self._result['error'] = arvados.errors.HttpError(
356                         self._result['status_code'],
357                         self._headers.get('x-status-line', 'Error'))
358             except self.HTTP_ERRORS as e:
359                 self._result = {
360                     'error': e,
361                 }
362             self._usable = ok != False
363             if self._result.get('status_code', None):
364                 # The client worked well enough to get an HTTP status
365                 # code, so presumably any problems are just on the
366                 # server side and it's OK to reuse the client.
367                 self._put_user_agent(curl)
368             else:
369                 # Don't return this client to the pool, in case it's
370                 # broken.
371                 curl.close()
372             if not ok:
373                 _logger.debug("Request fail: GET %s => %s: %s",
374                               url, type(self._result['error']), str(self._result['error']))
375                 return None
376             if method == "HEAD":
377                 _logger.info("HEAD %s: %s bytes",
378                          self._result['status_code'],
379                          self._result.get('content-length'))
380                 return True
381
382             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
383                          self._result['status_code'],
384                          len(self._result['body']),
385                          t.msecs,
386                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
387
388             if self.download_counter:
389                 self.download_counter.add(len(self._result['body']))
390             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
391             if resp_md5 != locator.md5sum:
392                 _logger.warning("Checksum fail: md5(%s) = %s",
393                                 url, resp_md5)
394                 self._result['error'] = arvados.errors.HttpError(
395                     0, 'Checksum fail')
396                 return None
397             return self._result['body']
398
399         def put(self, hash_s, body, timeout=None):
400             url = self.root + hash_s
401             _logger.debug("Request: PUT %s", url)
402             curl = self._get_user_agent()
403             ok = None
404             try:
405                 with timer.Timer() as t:
406                     self._headers = {}
407                     body_reader = cStringIO.StringIO(body)
408                     response_body = cStringIO.StringIO()
409                     curl.setopt(pycurl.NOSIGNAL, 1)
410                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
411                     curl.setopt(pycurl.URL, url.encode('utf-8'))
412                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
413                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
414                     # response) instead of sending the request body immediately.
415                     # This allows the server to reject the request if the request
416                     # is invalid or the server is read-only, without waiting for
417                     # the client to send the entire block.
418                     curl.setopt(pycurl.UPLOAD, True)
419                     curl.setopt(pycurl.INFILESIZE, len(body))
420                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
421                     curl.setopt(pycurl.HTTPHEADER, [
422                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
423                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
424                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
425                     self._setcurltimeouts(curl, timeout)
426                     try:
427                         curl.perform()
428                     except Exception as e:
429                         raise arvados.errors.HttpError(0, str(e))
430                     self._result = {
431                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
432                         'body': response_body.getvalue(),
433                         'headers': self._headers,
434                         'error': False,
435                     }
436                 ok = retry.check_http_response_success(self._result['status_code'])
437                 if not ok:
438                     self._result['error'] = arvados.errors.HttpError(
439                         self._result['status_code'],
440                         self._headers.get('x-status-line', 'Error'))
441             except self.HTTP_ERRORS as e:
442                 self._result = {
443                     'error': e,
444                 }
445             self._usable = ok != False # still usable if ok is True or None
446             if self._result.get('status_code', None):
447                 # Client is functional. See comment in get().
448                 self._put_user_agent(curl)
449             else:
450                 curl.close()
451             if not ok:
452                 _logger.debug("Request fail: PUT %s => %s: %s",
453                               url, type(self._result['error']), str(self._result['error']))
454                 return False
455             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
456                          self._result['status_code'],
457                          len(body),
458                          t.msecs,
459                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
460             if self.upload_counter:
461                 self.upload_counter.add(len(body))
462             return True
463
464         def _setcurltimeouts(self, curl, timeouts):
465             if not timeouts:
466                 return
467             elif isinstance(timeouts, tuple):
468                 if len(timeouts) == 2:
469                     conn_t, xfer_t = timeouts
470                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
471                 else:
472                     conn_t, xfer_t, bandwidth_bps = timeouts
473             else:
474                 conn_t, xfer_t = (timeouts, timeouts)
475                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
476             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
477             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
478             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
479
480         def _headerfunction(self, header_line):
481             header_line = header_line.decode('iso-8859-1')
482             if ':' in header_line:
483                 name, value = header_line.split(':', 1)
484                 name = name.strip().lower()
485                 value = value.strip()
486             elif self._headers:
487                 name = self._lastheadername
488                 value = self._headers[name] + ' ' + header_line.strip()
489             elif header_line.startswith('HTTP/'):
490                 name = 'x-status-line'
491                 value = header_line
492             else:
493                 _logger.error("Unexpected header line: %s", header_line)
494                 return
495             self._lastheadername = name
496             self._headers[name] = value
497             # Returning None implies all bytes were written
498     
499
500     class KeepWriterQueue(Queue.Queue):
501         def __init__(self, copies):
502             Queue.Queue.__init__(self) # Old-style superclass
503             self.wanted_copies = copies
504             self.successful_copies = 0
505             self.response = None
506             self.successful_copies_lock = threading.Lock()
507             self.pending_tries = copies
508             self.pending_tries_notification = threading.Condition()
509         
510         def write_success(self, response, replicas_nr):
511             with self.successful_copies_lock:
512                 self.successful_copies += replicas_nr
513                 self.response = response
514             with self.pending_tries_notification:
515                 self.pending_tries_notification.notify_all()
516         
517         def write_fail(self, ks):
518             with self.pending_tries_notification:
519                 self.pending_tries += 1
520                 self.pending_tries_notification.notify()
521         
522         def pending_copies(self):
523             with self.successful_copies_lock:
524                 return self.wanted_copies - self.successful_copies
525
526         def get_next_task(self):
527             with self.pending_tries_notification:
528                 while True:
529                     if self.pending_copies() < 1:
530                         # This notify_all() is unnecessary --
531                         # write_success() already called notify_all()
532                         # when pending<1 became true, so it's not
533                         # possible for any other thread to be in
534                         # wait() now -- but it's cheap insurance
535                         # against deadlock so we do it anyway:
536                         self.pending_tries_notification.notify_all()
537                         # Drain the queue and then raise Queue.Empty
538                         while True:
539                             self.get_nowait()
540                             self.task_done()
541                     elif self.pending_tries > 0:
542                         service, service_root = self.get_nowait()
543                         if service.finished():
544                             self.task_done()
545                             continue
546                         self.pending_tries -= 1
547                         return service, service_root
548                     elif self.empty():
549                         self.pending_tries_notification.notify_all()
550                         raise Queue.Empty
551                     else:
552                         self.pending_tries_notification.wait()
553
554
555     class KeepWriterThreadPool(object):
556         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
557             self.total_task_nr = 0
558             self.wanted_copies = copies
559             if (not max_service_replicas) or (max_service_replicas >= copies):
560                 num_threads = 1
561             else:
562                 num_threads = int(math.ceil(float(copies) / max_service_replicas))
563             _logger.debug("Pool max threads is %d", num_threads)
564             self.workers = []
565             self.queue = KeepClient.KeepWriterQueue(copies)
566             # Create workers
567             for _ in range(num_threads):
568                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
569                 self.workers.append(w)
570         
571         def add_task(self, ks, service_root):
572             self.queue.put((ks, service_root))
573             self.total_task_nr += 1
574         
575         def done(self):
576             return self.queue.successful_copies
577         
578         def join(self):
579             # Start workers
580             for worker in self.workers:
581                 worker.start()
582             # Wait for finished work
583             self.queue.join()
584         
585         def response(self):
586             return self.queue.response
587     
588     
589     class KeepWriterThread(threading.Thread):
590         TaskFailed = RuntimeError()
591
592         def __init__(self, queue, data, data_hash, timeout=None):
593             super(KeepClient.KeepWriterThread, self).__init__()
594             self.timeout = timeout
595             self.queue = queue
596             self.data = data
597             self.data_hash = data_hash
598             self.daemon = True
599
600         def run(self):
601             while True:
602                 try:
603                     service, service_root = self.queue.get_next_task()
604                 except Queue.Empty:
605                     return
606                 try:
607                     locator, copies = self.do_task(service, service_root)
608                 except Exception as e:
609                     if e is not self.TaskFailed:
610                         _logger.exception("Exception in KeepWriterThread")
611                     self.queue.write_fail(service)
612                 else:
613                     self.queue.write_success(locator, copies)
614                 finally:
615                     self.queue.task_done()
616
617         def do_task(self, service, service_root):
618             success = bool(service.put(self.data_hash,
619                                         self.data,
620                                         timeout=self.timeout))
621             result = service.last_result()
622
623             if not success:
624                 if result.get('status_code', None):
625                     _logger.debug("Request fail: PUT %s => %s %s",
626                                   self.data_hash,
627                                   result['status_code'],
628                                   result['body'])
629                 raise self.TaskFailed
630
631             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
632                           str(threading.current_thread()),
633                           self.data_hash,
634                           len(self.data),
635                           service_root)
636             try:
637                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
638             except (KeyError, ValueError):
639                 replicas_stored = 1
640
641             return result['body'].strip(), replicas_stored
642
643
644     def __init__(self, api_client=None, proxy=None,
645                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
646                  api_token=None, local_store=None, block_cache=None,
647                  num_retries=0, session=None):
648         """Initialize a new KeepClient.
649
650         Arguments:
651         :api_client:
652           The API client to use to find Keep services.  If not
653           provided, KeepClient will build one from available Arvados
654           configuration.
655
656         :proxy:
657           If specified, this KeepClient will send requests to this Keep
658           proxy.  Otherwise, KeepClient will fall back to the setting of the
659           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
660           If you want to KeepClient does not use a proxy, pass in an empty
661           string.
662
663         :timeout:
664           The initial timeout (in seconds) for HTTP requests to Keep
665           non-proxy servers.  A tuple of three floats is interpreted as
666           (connection_timeout, read_timeout, minimum_bandwidth). A connection
667           will be aborted if the average traffic rate falls below
668           minimum_bandwidth bytes per second over an interval of read_timeout
669           seconds. Because timeouts are often a result of transient server
670           load, the actual connection timeout will be increased by a factor
671           of two on each retry.
672           Default: (2, 256, 32768).
673
674         :proxy_timeout:
675           The initial timeout (in seconds) for HTTP requests to
676           Keep proxies. A tuple of three floats is interpreted as
677           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
678           described above for adjusting connection timeouts on retry also
679           applies.
680           Default: (20, 256, 32768).
681
682         :api_token:
683           If you're not using an API client, but only talking
684           directly to a Keep proxy, this parameter specifies an API token
685           to authenticate Keep requests.  It is an error to specify both
686           api_client and api_token.  If you specify neither, KeepClient
687           will use one available from the Arvados configuration.
688
689         :local_store:
690           If specified, this KeepClient will bypass Keep
691           services, and save data to the named directory.  If unspecified,
692           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
693           environment variable.  If you want to ensure KeepClient does not
694           use local storage, pass in an empty string.  This is primarily
695           intended to mock a server for testing.
696
697         :num_retries:
698           The default number of times to retry failed requests.
699           This will be used as the default num_retries value when get() and
700           put() are called.  Default 0.
701         """
702         self.lock = threading.Lock()
703         if proxy is None:
704             if config.get('ARVADOS_KEEP_SERVICES'):
705                 proxy = config.get('ARVADOS_KEEP_SERVICES')
706             else:
707                 proxy = config.get('ARVADOS_KEEP_PROXY')
708         if api_token is None:
709             if api_client is None:
710                 api_token = config.get('ARVADOS_API_TOKEN')
711             else:
712                 api_token = api_client.api_token
713         elif api_client is not None:
714             raise ValueError(
715                 "can't build KeepClient with both API client and token")
716         if local_store is None:
717             local_store = os.environ.get('KEEP_LOCAL_STORE')
718
719         self.block_cache = block_cache if block_cache else KeepBlockCache()
720         self.timeout = timeout
721         self.proxy_timeout = proxy_timeout
722         self._user_agent_pool = Queue.LifoQueue()
723         self.upload_counter = Counter()
724         self.download_counter = Counter()
725         self.put_counter = Counter()
726         self.get_counter = Counter()
727         self.hits_counter = Counter()
728         self.misses_counter = Counter()
729
730         if local_store:
731             self.local_store = local_store
732             self.get = self.local_store_get
733             self.put = self.local_store_put
734         else:
735             self.num_retries = num_retries
736             self.max_replicas_per_service = None
737             if proxy:
738                 proxy_uris = proxy.split()
739                 for i in range(len(proxy_uris)):
740                     if not proxy_uris[i].endswith('/'):
741                         proxy_uris[i] += '/'
742                     # URL validation
743                     url = urlparse.urlparse(proxy_uris[i])
744                     if not (url.scheme and url.netloc):
745                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
746                 self.api_token = api_token
747                 self._gateway_services = {}
748                 self._keep_services = [{
749                     'uuid': "00000-bi6l4-%015d" % idx,
750                     'service_type': 'proxy',
751                     '_service_root': uri,
752                     } for idx, uri in enumerate(proxy_uris)]
753                 self._writable_services = self._keep_services
754                 self.using_proxy = True
755                 self._static_services_list = True
756             else:
757                 # It's important to avoid instantiating an API client
758                 # unless we actually need one, for testing's sake.
759                 if api_client is None:
760                     api_client = arvados.api('v1')
761                 self.api_client = api_client
762                 self.api_token = api_client.api_token
763                 self._gateway_services = {}
764                 self._keep_services = None
765                 self._writable_services = None
766                 self.using_proxy = None
767                 self._static_services_list = False
768
769     def current_timeout(self, attempt_number):
770         """Return the appropriate timeout to use for this client.
771
772         The proxy timeout setting if the backend service is currently a proxy,
773         the regular timeout setting otherwise.  The `attempt_number` indicates
774         how many times the operation has been tried already (starting from 0
775         for the first try), and scales the connection timeout portion of the
776         return value accordingly.
777
778         """
779         # TODO(twp): the timeout should be a property of a
780         # KeepService, not a KeepClient. See #4488.
781         t = self.proxy_timeout if self.using_proxy else self.timeout
782         if len(t) == 2:
783             return (t[0] * (1 << attempt_number), t[1])
784         else:
785             return (t[0] * (1 << attempt_number), t[1], t[2])
786     def _any_nondisk_services(self, service_list):
787         return any(ks.get('service_type', 'disk') != 'disk'
788                    for ks in service_list)
789
790     def build_services_list(self, force_rebuild=False):
791         if (self._static_services_list or
792               (self._keep_services and not force_rebuild)):
793             return
794         with self.lock:
795             try:
796                 keep_services = self.api_client.keep_services().accessible()
797             except Exception:  # API server predates Keep services.
798                 keep_services = self.api_client.keep_disks().list()
799
800             # Gateway services are only used when specified by UUID,
801             # so there's nothing to gain by filtering them by
802             # service_type.
803             self._gateway_services = {ks['uuid']: ks for ks in
804                                       keep_services.execute()['items']}
805             if not self._gateway_services:
806                 raise arvados.errors.NoKeepServersError()
807
808             # Precompute the base URI for each service.
809             for r in self._gateway_services.itervalues():
810                 host = r['service_host']
811                 if not host.startswith('[') and host.find(':') >= 0:
812                     # IPv6 URIs must be formatted like http://[::1]:80/...
813                     host = '[' + host + ']'
814                 r['_service_root'] = "{}://{}:{:d}/".format(
815                     'https' if r['service_ssl_flag'] else 'http',
816                     host,
817                     r['service_port'])
818
819             _logger.debug(str(self._gateway_services))
820             self._keep_services = [
821                 ks for ks in self._gateway_services.itervalues()
822                 if not ks.get('service_type', '').startswith('gateway:')]
823             self._writable_services = [ks for ks in self._keep_services
824                                        if not ks.get('read_only')]
825
826             # For disk type services, max_replicas_per_service is 1
827             # It is unknown (unlimited) for other service types.
828             if self._any_nondisk_services(self._writable_services):
829                 self.max_replicas_per_service = None
830             else:
831                 self.max_replicas_per_service = 1
832
833     def _service_weight(self, data_hash, service_uuid):
834         """Compute the weight of a Keep service endpoint for a data
835         block with a known hash.
836
837         The weight is md5(h + u) where u is the last 15 characters of
838         the service endpoint's UUID.
839         """
840         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
841
842     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
843         """Return an array of Keep service endpoints, in the order in
844         which they should be probed when reading or writing data with
845         the given hash+hints.
846         """
847         self.build_services_list(force_rebuild)
848
849         sorted_roots = []
850         # Use the services indicated by the given +K@... remote
851         # service hints, if any are present and can be resolved to a
852         # URI.
853         for hint in locator.hints:
854             if hint.startswith('K@'):
855                 if len(hint) == 7:
856                     sorted_roots.append(
857                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
858                 elif len(hint) == 29:
859                     svc = self._gateway_services.get(hint[2:])
860                     if svc:
861                         sorted_roots.append(svc['_service_root'])
862
863         # Sort the available local services by weight (heaviest first)
864         # for this locator, and return their service_roots (base URIs)
865         # in that order.
866         use_services = self._keep_services
867         if need_writable:
868             use_services = self._writable_services
869         self.using_proxy = self._any_nondisk_services(use_services)
870         sorted_roots.extend([
871             svc['_service_root'] for svc in sorted(
872                 use_services,
873                 reverse=True,
874                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
875         _logger.debug("{}: {}".format(locator, sorted_roots))
876         return sorted_roots
877
878     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
879         # roots_map is a dictionary, mapping Keep service root strings
880         # to KeepService objects.  Poll for Keep services, and add any
881         # new ones to roots_map.  Return the current list of local
882         # root strings.
883         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
884         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
885         for root in local_roots:
886             if root not in roots_map:
887                 roots_map[root] = self.KeepService(
888                     root, self._user_agent_pool,
889                     upload_counter=self.upload_counter,
890                     download_counter=self.download_counter,
891                     **headers)
892         return local_roots
893
894     @staticmethod
895     def _check_loop_result(result):
896         # KeepClient RetryLoops should save results as a 2-tuple: the
897         # actual result of the request, and the number of servers available
898         # to receive the request this round.
899         # This method returns True if there's a real result, False if
900         # there are no more servers available, otherwise None.
901         if isinstance(result, Exception):
902             return None
903         result, tried_server_count = result
904         if (result is not None) and (result is not False):
905             return True
906         elif tried_server_count < 1:
907             _logger.info("No more Keep services to try; giving up")
908             return False
909         else:
910             return None
911
912     def get_from_cache(self, loc):
913         """Fetch a block only if is in the cache, otherwise return None."""
914         slot = self.block_cache.get(loc)
915         if slot is not None and slot.ready.is_set():
916             return slot.get()
917         else:
918             return None
919
920     @retry.retry_method
921     def head(self, loc_s, num_retries=None):
922         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
923
924     @retry.retry_method
925     def get(self, loc_s, num_retries=None):
926         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
927
928     def _get_or_head(self, loc_s, method="GET", num_retries=None):
929         """Get data from Keep.
930
931         This method fetches one or more blocks of data from Keep.  It
932         sends a request each Keep service registered with the API
933         server (or the proxy provided when this client was
934         instantiated), then each service named in location hints, in
935         sequence.  As soon as one service provides the data, it's
936         returned.
937
938         Arguments:
939         * loc_s: A string of one or more comma-separated locators to fetch.
940           This method returns the concatenation of these blocks.
941         * num_retries: The number of times to retry GET requests to
942           *each* Keep server if it returns temporary failures, with
943           exponential backoff.  Note that, in each loop, the method may try
944           to fetch data from every available Keep service, along with any
945           that are named in location hints in the locator.  The default value
946           is set when the KeepClient is initialized.
947         """
948         if ',' in loc_s:
949             return ''.join(self.get(x) for x in loc_s.split(','))
950
951         self.get_counter.add(1)
952
953         locator = KeepLocator(loc_s)
954         if method == "GET":
955             slot, first = self.block_cache.reserve_cache(locator.md5sum)
956             if not first:
957                 self.hits_counter.add(1)
958                 v = slot.get()
959                 return v
960
961         self.misses_counter.add(1)
962
963         # If the locator has hints specifying a prefix (indicating a
964         # remote keepproxy) or the UUID of a local gateway service,
965         # read data from the indicated service(s) instead of the usual
966         # list of local disk services.
967         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
968                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
969         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
970                            for hint in locator.hints if (
971                                    hint.startswith('K@') and
972                                    len(hint) == 29 and
973                                    self._gateway_services.get(hint[2:])
974                                    )])
975         # Map root URLs to their KeepService objects.
976         roots_map = {
977             root: self.KeepService(root, self._user_agent_pool,
978                                    upload_counter=self.upload_counter,
979                                    download_counter=self.download_counter)
980             for root in hint_roots
981         }
982
983         # See #3147 for a discussion of the loop implementation.  Highlights:
984         # * Refresh the list of Keep services after each failure, in case
985         #   it's being updated.
986         # * Retry until we succeed, we're out of retries, or every available
987         #   service has returned permanent failure.
988         sorted_roots = []
989         roots_map = {}
990         blob = None
991         loop = retry.RetryLoop(num_retries, self._check_loop_result,
992                                backoff_start=2)
993         for tries_left in loop:
994             try:
995                 sorted_roots = self.map_new_services(
996                     roots_map, locator,
997                     force_rebuild=(tries_left < num_retries),
998                     need_writable=False)
999             except Exception as error:
1000                 loop.save_result(error)
1001                 continue
1002
1003             # Query KeepService objects that haven't returned
1004             # permanent failure, in our specified shuffle order.
1005             services_to_try = [roots_map[root]
1006                                for root in sorted_roots
1007                                if roots_map[root].usable()]
1008             for keep_service in services_to_try:
1009                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1010                 if blob is not None:
1011                     break
1012             loop.save_result((blob, len(services_to_try)))
1013
1014         # Always cache the result, then return it if we succeeded.
1015         if method == "GET":
1016             slot.set(blob)
1017             self.block_cache.cap_cache()
1018         if loop.success():
1019             if method == "HEAD":
1020                 return True
1021             else:
1022                 return blob
1023
1024         # Q: Including 403 is necessary for the Keep tests to continue
1025         # passing, but maybe they should expect KeepReadError instead?
1026         not_founds = sum(1 for key in sorted_roots
1027                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1028         service_errors = ((key, roots_map[key].last_result()['error'])
1029                           for key in sorted_roots)
1030         if not roots_map:
1031             raise arvados.errors.KeepReadError(
1032                 "failed to read {}: no Keep services available ({})".format(
1033                     loc_s, loop.last_result()))
1034         elif not_founds == len(sorted_roots):
1035             raise arvados.errors.NotFoundError(
1036                 "{} not found".format(loc_s), service_errors)
1037         else:
1038             raise arvados.errors.KeepReadError(
1039                 "failed to read {}".format(loc_s), service_errors, label="service")
1040
1041     @retry.retry_method
1042     def put(self, data, copies=2, num_retries=None):
1043         """Save data in Keep.
1044
1045         This method will get a list of Keep services from the API server, and
1046         send the data to each one simultaneously in a new thread.  Once the
1047         uploads are finished, if enough copies are saved, this method returns
1048         the most recent HTTP response body.  If requests fail to upload
1049         enough copies, this method raises KeepWriteError.
1050
1051         Arguments:
1052         * data: The string of data to upload.
1053         * copies: The number of copies that the user requires be saved.
1054           Default 2.
1055         * num_retries: The number of times to retry PUT requests to
1056           *each* Keep server if it returns temporary failures, with
1057           exponential backoff.  The default value is set when the
1058           KeepClient is initialized.
1059         """
1060
1061         if isinstance(data, unicode):
1062             data = data.encode("ascii")
1063         elif not isinstance(data, str):
1064             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1065
1066         self.put_counter.add(1)
1067
1068         data_hash = hashlib.md5(data).hexdigest()
1069         loc_s = data_hash + '+' + str(len(data))
1070         if copies < 1:
1071             return loc_s
1072         locator = KeepLocator(loc_s)
1073
1074         headers = {}
1075         # Tell the proxy how many copies we want it to store
1076         headers['X-Keep-Desired-Replicas'] = str(copies)
1077         roots_map = {}
1078         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1079                                backoff_start=2)
1080         done = 0
1081         for tries_left in loop:
1082             try:
1083                 sorted_roots = self.map_new_services(
1084                     roots_map, locator,
1085                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1086             except Exception as error:
1087                 loop.save_result(error)
1088                 continue
1089
1090             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1091                                                         data_hash=data_hash,
1092                                                         copies=copies - done,
1093                                                         max_service_replicas=self.max_replicas_per_service,
1094                                                         timeout=self.current_timeout(num_retries - tries_left))
1095             for service_root, ks in [(root, roots_map[root])
1096                                      for root in sorted_roots]:
1097                 if ks.finished():
1098                     continue
1099                 writer_pool.add_task(ks, service_root)
1100             writer_pool.join()
1101             done += writer_pool.done()
1102             loop.save_result((done >= copies, writer_pool.total_task_nr))
1103
1104         if loop.success():
1105             return writer_pool.response()
1106         if not roots_map:
1107             raise arvados.errors.KeepWriteError(
1108                 "failed to write {}: no Keep services available ({})".format(
1109                     data_hash, loop.last_result()))
1110         else:
1111             service_errors = ((key, roots_map[key].last_result()['error'])
1112                               for key in sorted_roots
1113                               if roots_map[key].last_result()['error'])
1114             raise arvados.errors.KeepWriteError(
1115                 "failed to write {} (wanted {} copies but wrote {})".format(
1116                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1117
1118     def local_store_put(self, data, copies=1, num_retries=None):
1119         """A stub for put().
1120
1121         This method is used in place of the real put() method when
1122         using local storage (see constructor's local_store argument).
1123
1124         copies and num_retries arguments are ignored: they are here
1125         only for the sake of offering the same call signature as
1126         put().
1127
1128         Data stored this way can be retrieved via local_store_get().
1129         """
1130         md5 = hashlib.md5(data).hexdigest()
1131         locator = '%s+%d' % (md5, len(data))
1132         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1133             f.write(data)
1134         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1135                   os.path.join(self.local_store, md5))
1136         return locator
1137
1138     def local_store_get(self, loc_s, num_retries=None):
1139         """Companion to local_store_put()."""
1140         try:
1141             locator = KeepLocator(loc_s)
1142         except ValueError:
1143             raise arvados.errors.NotFoundError(
1144                 "Invalid data locator: '%s'" % loc_s)
1145         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1146             return ''
1147         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1148             return f.read()
1149
1150     def is_cached(self, locator):
1151         return self.block_cache.reserve_cache(expect_hash)