Merge branch '9897-log-block-prefetch-worker-exceptions' closes #9897
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import sys
13 import threading
14 import timer
15 import urlparse
16
17 import arvados
18 import arvados.config as config
19 import arvados.errors
20 import arvados.retry as retry
21 import arvados.util
22
23 _logger = logging.getLogger('arvados.keep')
24 global_client_object = None
25
26
27 # Monkey patch TCP constants when not available (apple). Values sourced from:
28 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
29 if sys.platform == 'darwin':
30     if not hasattr(socket, 'TCP_KEEPALIVE'):
31         socket.TCP_KEEPALIVE = 0x010
32     if not hasattr(socket, 'TCP_KEEPINTVL'):
33         socket.TCP_KEEPINTVL = 0x101
34     if not hasattr(socket, 'TCP_KEEPCNT'):
35         socket.TCP_KEEPCNT = 0x102
36
37
38 class KeepLocator(object):
39     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
40     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
41
42     def __init__(self, locator_str):
43         self.hints = []
44         self._perm_sig = None
45         self._perm_expiry = None
46         pieces = iter(locator_str.split('+'))
47         self.md5sum = next(pieces)
48         try:
49             self.size = int(next(pieces))
50         except StopIteration:
51             self.size = None
52         for hint in pieces:
53             if self.HINT_RE.match(hint) is None:
54                 raise ValueError("invalid hint format: {}".format(hint))
55             elif hint.startswith('A'):
56                 self.parse_permission_hint(hint)
57             else:
58                 self.hints.append(hint)
59
60     def __str__(self):
61         return '+'.join(
62             str(s) for s in [self.md5sum, self.size,
63                              self.permission_hint()] + self.hints
64             if s is not None)
65
66     def stripped(self):
67         if self.size is not None:
68             return "%s+%i" % (self.md5sum, self.size)
69         else:
70             return self.md5sum
71
72     def _make_hex_prop(name, length):
73         # Build and return a new property with the given name that
74         # must be a hex string of the given length.
75         data_name = '_{}'.format(name)
76         def getter(self):
77             return getattr(self, data_name)
78         def setter(self, hex_str):
79             if not arvados.util.is_hex(hex_str, length):
80                 raise ValueError("{} is not a {}-digit hex string: {}".
81                                  format(name, length, hex_str))
82             setattr(self, data_name, hex_str)
83         return property(getter, setter)
84
85     md5sum = _make_hex_prop('md5sum', 32)
86     perm_sig = _make_hex_prop('perm_sig', 40)
87
88     @property
89     def perm_expiry(self):
90         return self._perm_expiry
91
92     @perm_expiry.setter
93     def perm_expiry(self, value):
94         if not arvados.util.is_hex(value, 1, 8):
95             raise ValueError(
96                 "permission timestamp must be a hex Unix timestamp: {}".
97                 format(value))
98         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
99
100     def permission_hint(self):
101         data = [self.perm_sig, self.perm_expiry]
102         if None in data:
103             return None
104         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
105         return "A{}@{:08x}".format(*data)
106
107     def parse_permission_hint(self, s):
108         try:
109             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
110         except IndexError:
111             raise ValueError("bad permission hint {}".format(s))
112
113     def permission_expired(self, as_of_dt=None):
114         if self.perm_expiry is None:
115             return False
116         elif as_of_dt is None:
117             as_of_dt = datetime.datetime.now()
118         return self.perm_expiry <= as_of_dt
119
120
121 class Keep(object):
122     """Simple interface to a global KeepClient object.
123
124     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
125     own API client.  The global KeepClient will build an API client from the
126     current Arvados configuration, which may not match the one you built.
127     """
128     _last_key = None
129
130     @classmethod
131     def global_client_object(cls):
132         global global_client_object
133         # Previously, KeepClient would change its behavior at runtime based
134         # on these configuration settings.  We simulate that behavior here
135         # by checking the values and returning a new KeepClient if any of
136         # them have changed.
137         key = (config.get('ARVADOS_API_HOST'),
138                config.get('ARVADOS_API_TOKEN'),
139                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
140                config.get('ARVADOS_KEEP_PROXY'),
141                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
142                os.environ.get('KEEP_LOCAL_STORE'))
143         if (global_client_object is None) or (cls._last_key != key):
144             global_client_object = KeepClient()
145             cls._last_key = key
146         return global_client_object
147
148     @staticmethod
149     def get(locator, **kwargs):
150         return Keep.global_client_object().get(locator, **kwargs)
151
152     @staticmethod
153     def put(data, **kwargs):
154         return Keep.global_client_object().put(data, **kwargs)
155
156 class KeepBlockCache(object):
157     # Default RAM cache is 256MiB
158     def __init__(self, cache_max=(256 * 1024 * 1024)):
159         self.cache_max = cache_max
160         self._cache = []
161         self._cache_lock = threading.Lock()
162
163     class CacheSlot(object):
164         __slots__ = ("locator", "ready", "content")
165
166         def __init__(self, locator):
167             self.locator = locator
168             self.ready = threading.Event()
169             self.content = None
170
171         def get(self):
172             self.ready.wait()
173             return self.content
174
175         def set(self, value):
176             self.content = value
177             self.ready.set()
178
179         def size(self):
180             if self.content is None:
181                 return 0
182             else:
183                 return len(self.content)
184
185     def cap_cache(self):
186         '''Cap the cache size to self.cache_max'''
187         with self._cache_lock:
188             # Select all slots except those where ready.is_set() and content is
189             # None (that means there was an error reading the block).
190             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
191             sm = sum([slot.size() for slot in self._cache])
192             while len(self._cache) > 0 and sm > self.cache_max:
193                 for i in xrange(len(self._cache)-1, -1, -1):
194                     if self._cache[i].ready.is_set():
195                         del self._cache[i]
196                         break
197                 sm = sum([slot.size() for slot in self._cache])
198
199     def _get(self, locator):
200         # Test if the locator is already in the cache
201         for i in xrange(0, len(self._cache)):
202             if self._cache[i].locator == locator:
203                 n = self._cache[i]
204                 if i != 0:
205                     # move it to the front
206                     del self._cache[i]
207                     self._cache.insert(0, n)
208                 return n
209         return None
210
211     def get(self, locator):
212         with self._cache_lock:
213             return self._get(locator)
214
215     def reserve_cache(self, locator):
216         '''Reserve a cache slot for the specified locator,
217         or return the existing slot.'''
218         with self._cache_lock:
219             n = self._get(locator)
220             if n:
221                 return n, False
222             else:
223                 # Add a new cache slot for the locator
224                 n = KeepBlockCache.CacheSlot(locator)
225                 self._cache.insert(0, n)
226                 return n, True
227
228 class Counter(object):
229     def __init__(self, v=0):
230         self._lk = threading.Lock()
231         self._val = v
232
233     def add(self, v):
234         with self._lk:
235             self._val += v
236
237     def get(self):
238         with self._lk:
239             return self._val
240
241
242 class KeepClient(object):
243
244     # Default Keep server connection timeout:  2 seconds
245     # Default Keep server read timeout:       256 seconds
246     # Default Keep server bandwidth minimum:  32768 bytes per second
247     # Default Keep proxy connection timeout:  20 seconds
248     # Default Keep proxy read timeout:        256 seconds
249     # Default Keep proxy bandwidth minimum:   32768 bytes per second
250     DEFAULT_TIMEOUT = (2, 256, 32768)
251     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
252
253
254     class KeepService(object):
255         """Make requests to a single Keep service, and track results.
256
257         A KeepService is intended to last long enough to perform one
258         transaction (GET or PUT) against one Keep service. This can
259         involve calling either get() or put() multiple times in order
260         to retry after transient failures. However, calling both get()
261         and put() on a single instance -- or using the same instance
262         to access two different Keep services -- will not produce
263         sensible behavior.
264         """
265
266         HTTP_ERRORS = (
267             socket.error,
268             ssl.SSLError,
269             arvados.errors.HttpError,
270         )
271
272         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
273                      upload_counter=None,
274                      download_counter=None, **headers):
275             self.root = root
276             self._user_agent_pool = user_agent_pool
277             self._result = {'error': None}
278             self._usable = True
279             self._session = None
280             self.get_headers = {'Accept': 'application/octet-stream'}
281             self.get_headers.update(headers)
282             self.put_headers = headers
283             self.upload_counter = upload_counter
284             self.download_counter = download_counter
285
286         def usable(self):
287             """Is it worth attempting a request?"""
288             return self._usable
289
290         def finished(self):
291             """Did the request succeed or encounter permanent failure?"""
292             return self._result['error'] == False or not self._usable
293
294         def last_result(self):
295             return self._result
296
297         def _get_user_agent(self):
298             try:
299                 return self._user_agent_pool.get(False)
300             except Queue.Empty:
301                 return pycurl.Curl()
302
303         def _put_user_agent(self, ua):
304             try:
305                 ua.reset()
306                 self._user_agent_pool.put(ua, False)
307             except:
308                 ua.close()
309
310         @staticmethod
311         def _socket_open(family, socktype, protocol, address=None):
312             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
313             s = socket.socket(family, socktype, protocol)
314             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
315             # Will throw invalid protocol error on mac. This test prevents that.
316             if hasattr(socket, 'TCP_KEEPIDLE'):
317                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
318             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
319             return s
320
321         def get(self, locator, method="GET", timeout=None):
322             # locator is a KeepLocator object.
323             url = self.root + str(locator)
324             _logger.debug("Request: %s %s", method, url)
325             curl = self._get_user_agent()
326             ok = None
327             try:
328                 with timer.Timer() as t:
329                     self._headers = {}
330                     response_body = cStringIO.StringIO()
331                     curl.setopt(pycurl.NOSIGNAL, 1)
332                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
333                     curl.setopt(pycurl.URL, url.encode('utf-8'))
334                     curl.setopt(pycurl.HTTPHEADER, [
335                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
336                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
337                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
338                     if method == "HEAD":
339                         curl.setopt(pycurl.NOBODY, True)
340                     self._setcurltimeouts(curl, timeout)
341
342                     try:
343                         curl.perform()
344                     except Exception as e:
345                         raise arvados.errors.HttpError(0, str(e))
346                     self._result = {
347                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
348                         'body': response_body.getvalue(),
349                         'headers': self._headers,
350                         'error': False,
351                     }
352
353                 ok = retry.check_http_response_success(self._result['status_code'])
354                 if not ok:
355                     self._result['error'] = arvados.errors.HttpError(
356                         self._result['status_code'],
357                         self._headers.get('x-status-line', 'Error'))
358             except self.HTTP_ERRORS as e:
359                 self._result = {
360                     'error': e,
361                 }
362             self._usable = ok != False
363             if self._result.get('status_code', None):
364                 # The client worked well enough to get an HTTP status
365                 # code, so presumably any problems are just on the
366                 # server side and it's OK to reuse the client.
367                 self._put_user_agent(curl)
368             else:
369                 # Don't return this client to the pool, in case it's
370                 # broken.
371                 curl.close()
372             if not ok:
373                 _logger.debug("Request fail: GET %s => %s: %s",
374                               url, type(self._result['error']), str(self._result['error']))
375                 return None
376             if method == "HEAD":
377                 _logger.info("HEAD %s: %s bytes",
378                          self._result['status_code'],
379                          self._result.get('content-length'))
380                 return True
381
382             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
383                          self._result['status_code'],
384                          len(self._result['body']),
385                          t.msecs,
386                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
387
388             if self.download_counter:
389                 self.download_counter.add(len(self._result['body']))
390             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
391             if resp_md5 != locator.md5sum:
392                 _logger.warning("Checksum fail: md5(%s) = %s",
393                                 url, resp_md5)
394                 self._result['error'] = arvados.errors.HttpError(
395                     0, 'Checksum fail')
396                 return None
397             return self._result['body']
398
399         def put(self, hash_s, body, timeout=None):
400             url = self.root + hash_s
401             _logger.debug("Request: PUT %s", url)
402             curl = self._get_user_agent()
403             ok = None
404             try:
405                 with timer.Timer() as t:
406                     self._headers = {}
407                     body_reader = cStringIO.StringIO(body)
408                     response_body = cStringIO.StringIO()
409                     curl.setopt(pycurl.NOSIGNAL, 1)
410                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
411                     curl.setopt(pycurl.URL, url.encode('utf-8'))
412                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
413                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
414                     # response) instead of sending the request body immediately.
415                     # This allows the server to reject the request if the request
416                     # is invalid or the server is read-only, without waiting for
417                     # the client to send the entire block.
418                     curl.setopt(pycurl.UPLOAD, True)
419                     curl.setopt(pycurl.INFILESIZE, len(body))
420                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
421                     curl.setopt(pycurl.HTTPHEADER, [
422                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
423                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
424                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
425                     self._setcurltimeouts(curl, timeout)
426                     try:
427                         curl.perform()
428                     except Exception as e:
429                         raise arvados.errors.HttpError(0, str(e))
430                     self._result = {
431                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
432                         'body': response_body.getvalue(),
433                         'headers': self._headers,
434                         'error': False,
435                     }
436                 ok = retry.check_http_response_success(self._result['status_code'])
437                 if not ok:
438                     self._result['error'] = arvados.errors.HttpError(
439                         self._result['status_code'],
440                         self._headers.get('x-status-line', 'Error'))
441             except self.HTTP_ERRORS as e:
442                 self._result = {
443                     'error': e,
444                 }
445             self._usable = ok != False # still usable if ok is True or None
446             if self._result.get('status_code', None):
447                 # Client is functional. See comment in get().
448                 self._put_user_agent(curl)
449             else:
450                 curl.close()
451             if not ok:
452                 _logger.debug("Request fail: PUT %s => %s: %s",
453                               url, type(self._result['error']), str(self._result['error']))
454                 return False
455             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
456                          self._result['status_code'],
457                          len(body),
458                          t.msecs,
459                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
460             if self.upload_counter:
461                 self.upload_counter.add(len(body))
462             return True
463
464         def _setcurltimeouts(self, curl, timeouts):
465             if not timeouts:
466                 return
467             elif isinstance(timeouts, tuple):
468                 if len(timeouts) == 2:
469                     conn_t, xfer_t = timeouts
470                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
471                 else:
472                     conn_t, xfer_t, bandwidth_bps = timeouts
473             else:
474                 conn_t, xfer_t = (timeouts, timeouts)
475                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
476             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
477             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
478             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
479
480         def _headerfunction(self, header_line):
481             header_line = header_line.decode('iso-8859-1')
482             if ':' in header_line:
483                 name, value = header_line.split(':', 1)
484                 name = name.strip().lower()
485                 value = value.strip()
486             elif self._headers:
487                 name = self._lastheadername
488                 value = self._headers[name] + ' ' + header_line.strip()
489             elif header_line.startswith('HTTP/'):
490                 name = 'x-status-line'
491                 value = header_line
492             else:
493                 _logger.error("Unexpected header line: %s", header_line)
494                 return
495             self._lastheadername = name
496             self._headers[name] = value
497             # Returning None implies all bytes were written
498     
499
500     class KeepWriterQueue(Queue.Queue):
501         def __init__(self, copies):
502             Queue.Queue.__init__(self) # Old-style superclass
503             self.wanted_copies = copies
504             self.successful_copies = 0
505             self.response = None
506             self.successful_copies_lock = threading.Lock()
507             self.pending_tries = copies
508             self.pending_tries_notification = threading.Condition()
509         
510         def write_success(self, response, replicas_nr):
511             with self.successful_copies_lock:
512                 self.successful_copies += replicas_nr
513                 self.response = response
514         
515         def write_fail(self, ks, status_code):
516             with self.pending_tries_notification:
517                 self.pending_tries += 1
518                 self.pending_tries_notification.notify()
519         
520         def pending_copies(self):
521             with self.successful_copies_lock:
522                 return self.wanted_copies - self.successful_copies
523     
524     
525     class KeepWriterThreadPool(object):
526         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
527             self.total_task_nr = 0
528             self.wanted_copies = copies
529             if (not max_service_replicas) or (max_service_replicas >= copies):
530                 num_threads = 1
531             else:
532                 num_threads = int(math.ceil(float(copies) / max_service_replicas))
533             _logger.debug("Pool max threads is %d", num_threads)
534             self.workers = []
535             self.queue = KeepClient.KeepWriterQueue(copies)
536             # Create workers
537             for _ in range(num_threads):
538                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
539                 self.workers.append(w)
540         
541         def add_task(self, ks, service_root):
542             self.queue.put((ks, service_root))
543             self.total_task_nr += 1
544         
545         def done(self):
546             return self.queue.successful_copies
547         
548         def join(self):
549             # Start workers
550             for worker in self.workers:
551                 worker.start()
552             # Wait for finished work
553             self.queue.join()
554             with self.queue.pending_tries_notification:
555                 self.queue.pending_tries_notification.notify_all()
556             for worker in self.workers:
557                 worker.join()
558         
559         def response(self):
560             return self.queue.response
561     
562     
563     class KeepWriterThread(threading.Thread):
564         def __init__(self, queue, data, data_hash, timeout=None):
565             super(KeepClient.KeepWriterThread, self).__init__()
566             self.timeout = timeout
567             self.queue = queue
568             self.data = data
569             self.data_hash = data_hash
570         
571         def run(self):
572             while not self.queue.empty():
573                 if self.queue.pending_copies() > 0:
574                     # Avoid overreplication, wait for some needed re-attempt
575                     with self.queue.pending_tries_notification:
576                         if self.queue.pending_tries <= 0:
577                             self.queue.pending_tries_notification.wait()
578                             continue # try again when awake
579                         self.queue.pending_tries -= 1
580
581                     # Get to work
582                     try:
583                         service, service_root = self.queue.get_nowait()
584                     except Queue.Empty:
585                         continue
586                     if service.finished():
587                         self.queue.task_done()
588                         continue
589                     success = bool(service.put(self.data_hash,
590                                                 self.data,
591                                                 timeout=self.timeout))
592                     result = service.last_result()
593                     if success:
594                         _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
595                                       str(threading.current_thread()),
596                                       self.data_hash,
597                                       len(self.data),
598                                       service_root)
599                         try:
600                             replicas_stored = int(result['headers']['x-keep-replicas-stored'])
601                         except (KeyError, ValueError):
602                             replicas_stored = 1
603                         
604                         self.queue.write_success(result['body'].strip(), replicas_stored)
605                     else:
606                         if result.get('status_code', None):
607                             _logger.debug("Request fail: PUT %s => %s %s",
608                                           self.data_hash,
609                                           result['status_code'],
610                                           result['body'])
611                         self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
612                     # Mark as done so the queue can be join()ed
613                     self.queue.task_done()
614                 else:
615                     # Remove the task from the queue anyways
616                     try:
617                         self.queue.get_nowait()
618                         # Mark as done so the queue can be join()ed
619                         self.queue.task_done()
620                     except Queue.Empty:
621                         continue
622
623
624     def __init__(self, api_client=None, proxy=None,
625                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
626                  api_token=None, local_store=None, block_cache=None,
627                  num_retries=0, session=None):
628         """Initialize a new KeepClient.
629
630         Arguments:
631         :api_client:
632           The API client to use to find Keep services.  If not
633           provided, KeepClient will build one from available Arvados
634           configuration.
635
636         :proxy:
637           If specified, this KeepClient will send requests to this Keep
638           proxy.  Otherwise, KeepClient will fall back to the setting of the
639           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
640           If you want to KeepClient does not use a proxy, pass in an empty
641           string.
642
643         :timeout:
644           The initial timeout (in seconds) for HTTP requests to Keep
645           non-proxy servers.  A tuple of three floats is interpreted as
646           (connection_timeout, read_timeout, minimum_bandwidth). A connection
647           will be aborted if the average traffic rate falls below
648           minimum_bandwidth bytes per second over an interval of read_timeout
649           seconds. Because timeouts are often a result of transient server
650           load, the actual connection timeout will be increased by a factor
651           of two on each retry.
652           Default: (2, 256, 32768).
653
654         :proxy_timeout:
655           The initial timeout (in seconds) for HTTP requests to
656           Keep proxies. A tuple of three floats is interpreted as
657           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
658           described above for adjusting connection timeouts on retry also
659           applies.
660           Default: (20, 256, 32768).
661
662         :api_token:
663           If you're not using an API client, but only talking
664           directly to a Keep proxy, this parameter specifies an API token
665           to authenticate Keep requests.  It is an error to specify both
666           api_client and api_token.  If you specify neither, KeepClient
667           will use one available from the Arvados configuration.
668
669         :local_store:
670           If specified, this KeepClient will bypass Keep
671           services, and save data to the named directory.  If unspecified,
672           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
673           environment variable.  If you want to ensure KeepClient does not
674           use local storage, pass in an empty string.  This is primarily
675           intended to mock a server for testing.
676
677         :num_retries:
678           The default number of times to retry failed requests.
679           This will be used as the default num_retries value when get() and
680           put() are called.  Default 0.
681         """
682         self.lock = threading.Lock()
683         if proxy is None:
684             if config.get('ARVADOS_KEEP_SERVICES'):
685                 proxy = config.get('ARVADOS_KEEP_SERVICES')
686             else:
687                 proxy = config.get('ARVADOS_KEEP_PROXY')
688         if api_token is None:
689             if api_client is None:
690                 api_token = config.get('ARVADOS_API_TOKEN')
691             else:
692                 api_token = api_client.api_token
693         elif api_client is not None:
694             raise ValueError(
695                 "can't build KeepClient with both API client and token")
696         if local_store is None:
697             local_store = os.environ.get('KEEP_LOCAL_STORE')
698
699         self.block_cache = block_cache if block_cache else KeepBlockCache()
700         self.timeout = timeout
701         self.proxy_timeout = proxy_timeout
702         self._user_agent_pool = Queue.LifoQueue()
703         self.upload_counter = Counter()
704         self.download_counter = Counter()
705         self.put_counter = Counter()
706         self.get_counter = Counter()
707         self.hits_counter = Counter()
708         self.misses_counter = Counter()
709
710         if local_store:
711             self.local_store = local_store
712             self.get = self.local_store_get
713             self.put = self.local_store_put
714         else:
715             self.num_retries = num_retries
716             self.max_replicas_per_service = None
717             if proxy:
718                 proxy_uris = proxy.split()
719                 for i in range(len(proxy_uris)):
720                     if not proxy_uris[i].endswith('/'):
721                         proxy_uris[i] += '/'
722                     # URL validation
723                     url = urlparse.urlparse(proxy_uris[i])
724                     if not (url.scheme and url.netloc):
725                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
726                 self.api_token = api_token
727                 self._gateway_services = {}
728                 self._keep_services = [{
729                     'uuid': "00000-bi6l4-%015d" % idx,
730                     'service_type': 'proxy',
731                     '_service_root': uri,
732                     } for idx, uri in enumerate(proxy_uris)]
733                 self._writable_services = self._keep_services
734                 self.using_proxy = True
735                 self._static_services_list = True
736             else:
737                 # It's important to avoid instantiating an API client
738                 # unless we actually need one, for testing's sake.
739                 if api_client is None:
740                     api_client = arvados.api('v1')
741                 self.api_client = api_client
742                 self.api_token = api_client.api_token
743                 self._gateway_services = {}
744                 self._keep_services = None
745                 self._writable_services = None
746                 self.using_proxy = None
747                 self._static_services_list = False
748
749     def current_timeout(self, attempt_number):
750         """Return the appropriate timeout to use for this client.
751
752         The proxy timeout setting if the backend service is currently a proxy,
753         the regular timeout setting otherwise.  The `attempt_number` indicates
754         how many times the operation has been tried already (starting from 0
755         for the first try), and scales the connection timeout portion of the
756         return value accordingly.
757
758         """
759         # TODO(twp): the timeout should be a property of a
760         # KeepService, not a KeepClient. See #4488.
761         t = self.proxy_timeout if self.using_proxy else self.timeout
762         if len(t) == 2:
763             return (t[0] * (1 << attempt_number), t[1])
764         else:
765             return (t[0] * (1 << attempt_number), t[1], t[2])
766     def _any_nondisk_services(self, service_list):
767         return any(ks.get('service_type', 'disk') != 'disk'
768                    for ks in service_list)
769
770     def build_services_list(self, force_rebuild=False):
771         if (self._static_services_list or
772               (self._keep_services and not force_rebuild)):
773             return
774         with self.lock:
775             try:
776                 keep_services = self.api_client.keep_services().accessible()
777             except Exception:  # API server predates Keep services.
778                 keep_services = self.api_client.keep_disks().list()
779
780             # Gateway services are only used when specified by UUID,
781             # so there's nothing to gain by filtering them by
782             # service_type.
783             self._gateway_services = {ks['uuid']: ks for ks in
784                                       keep_services.execute()['items']}
785             if not self._gateway_services:
786                 raise arvados.errors.NoKeepServersError()
787
788             # Precompute the base URI for each service.
789             for r in self._gateway_services.itervalues():
790                 host = r['service_host']
791                 if not host.startswith('[') and host.find(':') >= 0:
792                     # IPv6 URIs must be formatted like http://[::1]:80/...
793                     host = '[' + host + ']'
794                 r['_service_root'] = "{}://{}:{:d}/".format(
795                     'https' if r['service_ssl_flag'] else 'http',
796                     host,
797                     r['service_port'])
798
799             _logger.debug(str(self._gateway_services))
800             self._keep_services = [
801                 ks for ks in self._gateway_services.itervalues()
802                 if not ks.get('service_type', '').startswith('gateway:')]
803             self._writable_services = [ks for ks in self._keep_services
804                                        if not ks.get('read_only')]
805
806             # For disk type services, max_replicas_per_service is 1
807             # It is unknown (unlimited) for other service types.
808             if self._any_nondisk_services(self._writable_services):
809                 self.max_replicas_per_service = None
810             else:
811                 self.max_replicas_per_service = 1
812
813     def _service_weight(self, data_hash, service_uuid):
814         """Compute the weight of a Keep service endpoint for a data
815         block with a known hash.
816
817         The weight is md5(h + u) where u is the last 15 characters of
818         the service endpoint's UUID.
819         """
820         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
821
822     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
823         """Return an array of Keep service endpoints, in the order in
824         which they should be probed when reading or writing data with
825         the given hash+hints.
826         """
827         self.build_services_list(force_rebuild)
828
829         sorted_roots = []
830         # Use the services indicated by the given +K@... remote
831         # service hints, if any are present and can be resolved to a
832         # URI.
833         for hint in locator.hints:
834             if hint.startswith('K@'):
835                 if len(hint) == 7:
836                     sorted_roots.append(
837                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
838                 elif len(hint) == 29:
839                     svc = self._gateway_services.get(hint[2:])
840                     if svc:
841                         sorted_roots.append(svc['_service_root'])
842
843         # Sort the available local services by weight (heaviest first)
844         # for this locator, and return their service_roots (base URIs)
845         # in that order.
846         use_services = self._keep_services
847         if need_writable:
848             use_services = self._writable_services
849         self.using_proxy = self._any_nondisk_services(use_services)
850         sorted_roots.extend([
851             svc['_service_root'] for svc in sorted(
852                 use_services,
853                 reverse=True,
854                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
855         _logger.debug("{}: {}".format(locator, sorted_roots))
856         return sorted_roots
857
858     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
859         # roots_map is a dictionary, mapping Keep service root strings
860         # to KeepService objects.  Poll for Keep services, and add any
861         # new ones to roots_map.  Return the current list of local
862         # root strings.
863         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
864         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
865         for root in local_roots:
866             if root not in roots_map:
867                 roots_map[root] = self.KeepService(
868                     root, self._user_agent_pool,
869                     upload_counter=self.upload_counter,
870                     download_counter=self.download_counter,
871                     **headers)
872         return local_roots
873
874     @staticmethod
875     def _check_loop_result(result):
876         # KeepClient RetryLoops should save results as a 2-tuple: the
877         # actual result of the request, and the number of servers available
878         # to receive the request this round.
879         # This method returns True if there's a real result, False if
880         # there are no more servers available, otherwise None.
881         if isinstance(result, Exception):
882             return None
883         result, tried_server_count = result
884         if (result is not None) and (result is not False):
885             return True
886         elif tried_server_count < 1:
887             _logger.info("No more Keep services to try; giving up")
888             return False
889         else:
890             return None
891
892     def get_from_cache(self, loc):
893         """Fetch a block only if is in the cache, otherwise return None."""
894         slot = self.block_cache.get(loc)
895         if slot is not None and slot.ready.is_set():
896             return slot.get()
897         else:
898             return None
899
900     @retry.retry_method
901     def head(self, loc_s, num_retries=None):
902         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
903
904     @retry.retry_method
905     def get(self, loc_s, num_retries=None):
906         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
907
908     def _get_or_head(self, loc_s, method="GET", num_retries=None):
909         """Get data from Keep.
910
911         This method fetches one or more blocks of data from Keep.  It
912         sends a request each Keep service registered with the API
913         server (or the proxy provided when this client was
914         instantiated), then each service named in location hints, in
915         sequence.  As soon as one service provides the data, it's
916         returned.
917
918         Arguments:
919         * loc_s: A string of one or more comma-separated locators to fetch.
920           This method returns the concatenation of these blocks.
921         * num_retries: The number of times to retry GET requests to
922           *each* Keep server if it returns temporary failures, with
923           exponential backoff.  Note that, in each loop, the method may try
924           to fetch data from every available Keep service, along with any
925           that are named in location hints in the locator.  The default value
926           is set when the KeepClient is initialized.
927         """
928         if ',' in loc_s:
929             return ''.join(self.get(x) for x in loc_s.split(','))
930
931         self.get_counter.add(1)
932
933         locator = KeepLocator(loc_s)
934         if method == "GET":
935             slot, first = self.block_cache.reserve_cache(locator.md5sum)
936             if not first:
937                 self.hits_counter.add(1)
938                 v = slot.get()
939                 return v
940
941         self.misses_counter.add(1)
942
943         # If the locator has hints specifying a prefix (indicating a
944         # remote keepproxy) or the UUID of a local gateway service,
945         # read data from the indicated service(s) instead of the usual
946         # list of local disk services.
947         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
948                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
949         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
950                            for hint in locator.hints if (
951                                    hint.startswith('K@') and
952                                    len(hint) == 29 and
953                                    self._gateway_services.get(hint[2:])
954                                    )])
955         # Map root URLs to their KeepService objects.
956         roots_map = {
957             root: self.KeepService(root, self._user_agent_pool,
958                                    upload_counter=self.upload_counter,
959                                    download_counter=self.download_counter)
960             for root in hint_roots
961         }
962
963         # See #3147 for a discussion of the loop implementation.  Highlights:
964         # * Refresh the list of Keep services after each failure, in case
965         #   it's being updated.
966         # * Retry until we succeed, we're out of retries, or every available
967         #   service has returned permanent failure.
968         sorted_roots = []
969         roots_map = {}
970         blob = None
971         loop = retry.RetryLoop(num_retries, self._check_loop_result,
972                                backoff_start=2)
973         for tries_left in loop:
974             try:
975                 sorted_roots = self.map_new_services(
976                     roots_map, locator,
977                     force_rebuild=(tries_left < num_retries),
978                     need_writable=False)
979             except Exception as error:
980                 loop.save_result(error)
981                 continue
982
983             # Query KeepService objects that haven't returned
984             # permanent failure, in our specified shuffle order.
985             services_to_try = [roots_map[root]
986                                for root in sorted_roots
987                                if roots_map[root].usable()]
988             for keep_service in services_to_try:
989                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
990                 if blob is not None:
991                     break
992             loop.save_result((blob, len(services_to_try)))
993
994         # Always cache the result, then return it if we succeeded.
995         if method == "GET":
996             slot.set(blob)
997             self.block_cache.cap_cache()
998         if loop.success():
999             if method == "HEAD":
1000                 return True
1001             else:
1002                 return blob
1003
1004         # Q: Including 403 is necessary for the Keep tests to continue
1005         # passing, but maybe they should expect KeepReadError instead?
1006         not_founds = sum(1 for key in sorted_roots
1007                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1008         service_errors = ((key, roots_map[key].last_result()['error'])
1009                           for key in sorted_roots)
1010         if not roots_map:
1011             raise arvados.errors.KeepReadError(
1012                 "failed to read {}: no Keep services available ({})".format(
1013                     loc_s, loop.last_result()))
1014         elif not_founds == len(sorted_roots):
1015             raise arvados.errors.NotFoundError(
1016                 "{} not found".format(loc_s), service_errors)
1017         else:
1018             raise arvados.errors.KeepReadError(
1019                 "failed to read {}".format(loc_s), service_errors, label="service")
1020
1021     @retry.retry_method
1022     def put(self, data, copies=2, num_retries=None):
1023         """Save data in Keep.
1024
1025         This method will get a list of Keep services from the API server, and
1026         send the data to each one simultaneously in a new thread.  Once the
1027         uploads are finished, if enough copies are saved, this method returns
1028         the most recent HTTP response body.  If requests fail to upload
1029         enough copies, this method raises KeepWriteError.
1030
1031         Arguments:
1032         * data: The string of data to upload.
1033         * copies: The number of copies that the user requires be saved.
1034           Default 2.
1035         * num_retries: The number of times to retry PUT requests to
1036           *each* Keep server if it returns temporary failures, with
1037           exponential backoff.  The default value is set when the
1038           KeepClient is initialized.
1039         """
1040
1041         if isinstance(data, unicode):
1042             data = data.encode("ascii")
1043         elif not isinstance(data, str):
1044             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1045
1046         self.put_counter.add(1)
1047
1048         data_hash = hashlib.md5(data).hexdigest()
1049         loc_s = data_hash + '+' + str(len(data))
1050         if copies < 1:
1051             return loc_s
1052         locator = KeepLocator(loc_s)
1053
1054         headers = {}
1055         # Tell the proxy how many copies we want it to store
1056         headers['X-Keep-Desired-Replicas'] = str(copies)
1057         roots_map = {}
1058         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1059                                backoff_start=2)
1060         done = 0
1061         for tries_left in loop:
1062             try:
1063                 sorted_roots = self.map_new_services(
1064                     roots_map, locator,
1065                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1066             except Exception as error:
1067                 loop.save_result(error)
1068                 continue
1069
1070             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1071                                                         data_hash=data_hash,
1072                                                         copies=copies - done,
1073                                                         max_service_replicas=self.max_replicas_per_service,
1074                                                         timeout=self.current_timeout(num_retries - tries_left))
1075             for service_root, ks in [(root, roots_map[root])
1076                                      for root in sorted_roots]:
1077                 if ks.finished():
1078                     continue
1079                 writer_pool.add_task(ks, service_root)
1080             writer_pool.join()
1081             done += writer_pool.done()
1082             loop.save_result((done >= copies, writer_pool.total_task_nr))
1083
1084         if loop.success():
1085             return writer_pool.response()
1086         if not roots_map:
1087             raise arvados.errors.KeepWriteError(
1088                 "failed to write {}: no Keep services available ({})".format(
1089                     data_hash, loop.last_result()))
1090         else:
1091             service_errors = ((key, roots_map[key].last_result()['error'])
1092                               for key in sorted_roots
1093                               if roots_map[key].last_result()['error'])
1094             raise arvados.errors.KeepWriteError(
1095                 "failed to write {} (wanted {} copies but wrote {})".format(
1096                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1097
1098     def local_store_put(self, data, copies=1, num_retries=None):
1099         """A stub for put().
1100
1101         This method is used in place of the real put() method when
1102         using local storage (see constructor's local_store argument).
1103
1104         copies and num_retries arguments are ignored: they are here
1105         only for the sake of offering the same call signature as
1106         put().
1107
1108         Data stored this way can be retrieved via local_store_get().
1109         """
1110         md5 = hashlib.md5(data).hexdigest()
1111         locator = '%s+%d' % (md5, len(data))
1112         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1113             f.write(data)
1114         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1115                   os.path.join(self.local_store, md5))
1116         return locator
1117
1118     def local_store_get(self, loc_s, num_retries=None):
1119         """Companion to local_store_put()."""
1120         try:
1121             locator = KeepLocator(loc_s)
1122         except ValueError:
1123             raise arvados.errors.NotFoundError(
1124                 "Invalid data locator: '%s'" % loc_s)
1125         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1126             return ''
1127         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1128             return f.read()
1129
1130     def is_cached(self, locator):
1131         return self.block_cache.reserve_cache(expect_hash)