Merge branch '6358-put-rendezvous' closes #6358
[arvados.git] / sdk / python / arvados / keep.py
1 import bz2
2 import datetime
3 import fcntl
4 import functools
5 import gflags
6 import hashlib
7 import json
8 import logging
9 import os
10 import pprint
11 import pycurl
12 import Queue
13 import re
14 import socket
15 import ssl
16 import string
17 import cStringIO
18 import subprocess
19 import sys
20 import threading
21 import time
22 import timer
23 import types
24 import UserDict
25 import zlib
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.retry as retry
31 import arvados.util
32
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
35
36
37 class KeepLocator(object):
38     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
40
41     def __init__(self, locator_str):
42         self.hints = []
43         self._perm_sig = None
44         self._perm_expiry = None
45         pieces = iter(locator_str.split('+'))
46         self.md5sum = next(pieces)
47         try:
48             self.size = int(next(pieces))
49         except StopIteration:
50             self.size = None
51         for hint in pieces:
52             if self.HINT_RE.match(hint) is None:
53                 raise ValueError("invalid hint format: {}".format(hint))
54             elif hint.startswith('A'):
55                 self.parse_permission_hint(hint)
56             else:
57                 self.hints.append(hint)
58
59     def __str__(self):
60         return '+'.join(
61             str(s) for s in [self.md5sum, self.size,
62                              self.permission_hint()] + self.hints
63             if s is not None)
64
65     def stripped(self):
66         if self.size is not None:
67             return "%s+%i" % (self.md5sum, self.size)
68         else:
69             return self.md5sum
70
71     def _make_hex_prop(name, length):
72         # Build and return a new property with the given name that
73         # must be a hex string of the given length.
74         data_name = '_{}'.format(name)
75         def getter(self):
76             return getattr(self, data_name)
77         def setter(self, hex_str):
78             if not arvados.util.is_hex(hex_str, length):
79                 raise ValueError("{} is not a {}-digit hex string: {}".
80                                  format(name, length, hex_str))
81             setattr(self, data_name, hex_str)
82         return property(getter, setter)
83
84     md5sum = _make_hex_prop('md5sum', 32)
85     perm_sig = _make_hex_prop('perm_sig', 40)
86
87     @property
88     def perm_expiry(self):
89         return self._perm_expiry
90
91     @perm_expiry.setter
92     def perm_expiry(self, value):
93         if not arvados.util.is_hex(value, 1, 8):
94             raise ValueError(
95                 "permission timestamp must be a hex Unix timestamp: {}".
96                 format(value))
97         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
98
99     def permission_hint(self):
100         data = [self.perm_sig, self.perm_expiry]
101         if None in data:
102             return None
103         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104         return "A{}@{:08x}".format(*data)
105
106     def parse_permission_hint(self, s):
107         try:
108             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
109         except IndexError:
110             raise ValueError("bad permission hint {}".format(s))
111
112     def permission_expired(self, as_of_dt=None):
113         if self.perm_expiry is None:
114             return False
115         elif as_of_dt is None:
116             as_of_dt = datetime.datetime.now()
117         return self.perm_expiry <= as_of_dt
118
119
120 class Keep(object):
121     """Simple interface to a global KeepClient object.
122
123     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
124     own API client.  The global KeepClient will build an API client from the
125     current Arvados configuration, which may not match the one you built.
126     """
127     _last_key = None
128
129     @classmethod
130     def global_client_object(cls):
131         global global_client_object
132         # Previously, KeepClient would change its behavior at runtime based
133         # on these configuration settings.  We simulate that behavior here
134         # by checking the values and returning a new KeepClient if any of
135         # them have changed.
136         key = (config.get('ARVADOS_API_HOST'),
137                config.get('ARVADOS_API_TOKEN'),
138                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139                config.get('ARVADOS_KEEP_PROXY'),
140                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141                os.environ.get('KEEP_LOCAL_STORE'))
142         if (global_client_object is None) or (cls._last_key != key):
143             global_client_object = KeepClient()
144             cls._last_key = key
145         return global_client_object
146
147     @staticmethod
148     def get(locator, **kwargs):
149         return Keep.global_client_object().get(locator, **kwargs)
150
151     @staticmethod
152     def put(data, **kwargs):
153         return Keep.global_client_object().put(data, **kwargs)
154
155 class KeepBlockCache(object):
156     # Default RAM cache is 256MiB
157     def __init__(self, cache_max=(256 * 1024 * 1024)):
158         self.cache_max = cache_max
159         self._cache = []
160         self._cache_lock = threading.Lock()
161
162     class CacheSlot(object):
163         def __init__(self, locator):
164             self.locator = locator
165             self.ready = threading.Event()
166             self.content = None
167
168         def get(self):
169             self.ready.wait()
170             return self.content
171
172         def set(self, value):
173             self.content = value
174             self.ready.set()
175
176         def size(self):
177             if self.content is None:
178                 return 0
179             else:
180                 return len(self.content)
181
182     def cap_cache(self):
183         '''Cap the cache size to self.cache_max'''
184         with self._cache_lock:
185             # Select all slots except those where ready.is_set() and content is
186             # None (that means there was an error reading the block).
187             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188             sm = sum([slot.size() for slot in self._cache])
189             while len(self._cache) > 0 and sm > self.cache_max:
190                 for i in xrange(len(self._cache)-1, -1, -1):
191                     if self._cache[i].ready.is_set():
192                         del self._cache[i]
193                         break
194                 sm = sum([slot.size() for slot in self._cache])
195
196     def _get(self, locator):
197         # Test if the locator is already in the cache
198         for i in xrange(0, len(self._cache)):
199             if self._cache[i].locator == locator:
200                 n = self._cache[i]
201                 if i != 0:
202                     # move it to the front
203                     del self._cache[i]
204                     self._cache.insert(0, n)
205                 return n
206         return None
207
208     def get(self, locator):
209         with self._cache_lock:
210             return self._get(locator)
211
212     def reserve_cache(self, locator):
213         '''Reserve a cache slot for the specified locator,
214         or return the existing slot.'''
215         with self._cache_lock:
216             n = self._get(locator)
217             if n:
218                 return n, False
219             else:
220                 # Add a new cache slot for the locator
221                 n = KeepBlockCache.CacheSlot(locator)
222                 self._cache.insert(0, n)
223                 return n, True
224
225 class KeepClient(object):
226
227     # Default Keep server connection timeout:  2 seconds
228     # Default Keep server read timeout:      300 seconds
229     # Default Keep proxy connection timeout:  20 seconds
230     # Default Keep proxy read timeout:       300 seconds
231     DEFAULT_TIMEOUT = (2, 300)
232     DEFAULT_PROXY_TIMEOUT = (20, 300)
233
234     class ThreadLimiter(object):
235         """
236         Limit the number of threads running at a given time to
237         {desired successes} minus {successes reported}. When successes
238         reported == desired, wake up the remaining threads and tell
239         them to quit.
240
241         Should be used in a "with" block.
242         """
243         def __init__(self, todo):
244             self._started = 0
245             self._todo = todo
246             self._done = 0
247             self._response = None
248             self._start_lock = threading.Condition()
249             self._todo_lock = threading.Semaphore(todo)
250             self._done_lock = threading.Lock()
251             self._local = threading.local()
252
253         def __enter__(self):
254             self._start_lock.acquire()
255             if getattr(self._local, 'sequence', None) is not None:
256                 # If the calling thread has used set_sequence(N), then
257                 # we wait here until N other threads have started.
258                 while self._started < self._local.sequence:
259                     self._start_lock.wait()
260             self._started += 1
261             self._start_lock.notifyAll()
262             self._todo_lock.acquire()
263             self._start_lock.release()
264             return self
265
266         def __exit__(self, type, value, traceback):
267             self._todo_lock.release()
268
269         def set_sequence(self, sequence):
270             self._local.sequence = sequence
271
272         def shall_i_proceed(self):
273             """
274             Return true if the current thread should do stuff. Return
275             false if the current thread should just stop.
276             """
277             with self._done_lock:
278                 return (self._done < self._todo)
279
280         def save_response(self, response_body, replicas_stored):
281             """
282             Records a response body (a locator, possibly signed) returned by
283             the Keep server.  It is not necessary to save more than
284             one response, since we presume that any locator returned
285             in response to a successful request is valid.
286             """
287             with self._done_lock:
288                 self._done += replicas_stored
289                 self._response = response_body
290
291         def response(self):
292             """
293             Returns the body from the response to a PUT request.
294             """
295             with self._done_lock:
296                 return self._response
297
298         def done(self):
299             """
300             Return how many successes were reported.
301             """
302             with self._done_lock:
303                 return self._done
304
305
306     class KeepService(object):
307         """Make requests to a single Keep service, and track results.
308
309         A KeepService is intended to last long enough to perform one
310         transaction (GET or PUT) against one Keep service. This can
311         involve calling either get() or put() multiple times in order
312         to retry after transient failures. However, calling both get()
313         and put() on a single instance -- or using the same instance
314         to access two different Keep services -- will not produce
315         sensible behavior.
316         """
317
318         HTTP_ERRORS = (
319             socket.error,
320             ssl.SSLError,
321             arvados.errors.HttpError,
322         )
323
324         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
325             self.root = root
326             self._user_agent_pool = user_agent_pool
327             self._result = {'error': None}
328             self._usable = True
329             self._session = None
330             self.get_headers = {'Accept': 'application/octet-stream'}
331             self.get_headers.update(headers)
332             self.put_headers = headers
333
334         def usable(self):
335             """Is it worth attempting a request?"""
336             return self._usable
337
338         def finished(self):
339             """Did the request succeed or encounter permanent failure?"""
340             return self._result['error'] == False or not self._usable
341
342         def last_result(self):
343             return self._result
344
345         def _get_user_agent(self):
346             try:
347                 return self._user_agent_pool.get(False)
348             except Queue.Empty:
349                 return pycurl.Curl()
350
351         def _put_user_agent(self, ua):
352             try:
353                 ua.reset()
354                 self._user_agent_pool.put(ua, False)
355             except:
356                 ua.close()
357
358         @staticmethod
359         def _socket_open(family, socktype, protocol, address=None):
360             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
361             s = socket.socket(family, socktype, protocol)
362             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
363             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
364             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
365             return s
366
367         def get(self, locator, timeout=None):
368             # locator is a KeepLocator object.
369             url = self.root + str(locator)
370             _logger.debug("Request: GET %s", url)
371             curl = self._get_user_agent()
372             try:
373                 with timer.Timer() as t:
374                     self._headers = {}
375                     response_body = cStringIO.StringIO()
376                     curl.setopt(pycurl.NOSIGNAL, 1)
377                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
378                     curl.setopt(pycurl.URL, url.encode('utf-8'))
379                     curl.setopt(pycurl.HTTPHEADER, [
380                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
381                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
382                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
383                     self._setcurltimeouts(curl, timeout)
384                     try:
385                         curl.perform()
386                     except Exception as e:
387                         raise arvados.errors.HttpError(0, str(e))
388                     self._result = {
389                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
390                         'body': response_body.getvalue(),
391                         'headers': self._headers,
392                         'error': False,
393                     }
394                 ok = retry.check_http_response_success(self._result['status_code'])
395                 if not ok:
396                     self._result['error'] = arvados.errors.HttpError(
397                         self._result['status_code'],
398                         self._headers.get('x-status-line', 'Error'))
399             except self.HTTP_ERRORS as e:
400                 self._result = {
401                     'error': e,
402                 }
403                 ok = False
404             self._usable = ok != False
405             if self._result.get('status_code', None):
406                 # The client worked well enough to get an HTTP status
407                 # code, so presumably any problems are just on the
408                 # server side and it's OK to reuse the client.
409                 self._put_user_agent(curl)
410             else:
411                 # Don't return this client to the pool, in case it's
412                 # broken.
413                 curl.close()
414             if not ok:
415                 _logger.debug("Request fail: GET %s => %s: %s",
416                               url, type(self._result['error']), str(self._result['error']))
417                 return None
418             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
419                          self._result['status_code'],
420                          len(self._result['body']),
421                          t.msecs,
422                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
423             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
424             if resp_md5 != locator.md5sum:
425                 _logger.warning("Checksum fail: md5(%s) = %s",
426                                 url, resp_md5)
427                 self._result['error'] = arvados.errors.HttpError(
428                     0, 'Checksum fail')
429                 return None
430             return self._result['body']
431
432         def put(self, hash_s, body, timeout=None):
433             url = self.root + hash_s
434             _logger.debug("Request: PUT %s", url)
435             curl = self._get_user_agent()
436             try:
437                 self._headers = {}
438                 body_reader = cStringIO.StringIO(body)
439                 response_body = cStringIO.StringIO()
440                 curl.setopt(pycurl.NOSIGNAL, 1)
441                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
442                 curl.setopt(pycurl.URL, url.encode('utf-8'))
443                 # Using UPLOAD tells cURL to wait for a "go ahead" from the
444                 # Keep server (in the form of a HTTP/1.1 "100 Continue"
445                 # response) instead of sending the request body immediately.
446                 # This allows the server to reject the request if the request
447                 # is invalid or the server is read-only, without waiting for
448                 # the client to send the entire block.
449                 curl.setopt(pycurl.UPLOAD, True)
450                 curl.setopt(pycurl.INFILESIZE, len(body))
451                 curl.setopt(pycurl.READFUNCTION, body_reader.read)
452                 curl.setopt(pycurl.HTTPHEADER, [
453                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
454                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
455                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
456                 self._setcurltimeouts(curl, timeout)
457                 try:
458                     curl.perform()
459                 except Exception as e:
460                     raise arvados.errors.HttpError(0, str(e))
461                 self._result = {
462                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
463                     'body': response_body.getvalue(),
464                     'headers': self._headers,
465                     'error': False,
466                 }
467                 ok = retry.check_http_response_success(self._result['status_code'])
468                 if not ok:
469                     self._result['error'] = arvados.errors.HttpError(
470                         self._result['status_code'],
471                         self._headers.get('x-status-line', 'Error'))
472             except self.HTTP_ERRORS as e:
473                 self._result = {
474                     'error': e,
475                 }
476                 ok = False
477             self._usable = ok != False # still usable if ok is True or None
478             if self._result.get('status_code', None):
479                 # Client is functional. See comment in get().
480                 self._put_user_agent(curl)
481             else:
482                 curl.close()
483             if not ok:
484                 _logger.debug("Request fail: PUT %s => %s: %s",
485                               url, type(self._result['error']), str(self._result['error']))
486                 return False
487             return True
488
489         def _setcurltimeouts(self, curl, timeouts):
490             if not timeouts:
491                 return
492             elif isinstance(timeouts, tuple):
493                 conn_t, xfer_t = timeouts
494             else:
495                 conn_t, xfer_t = (timeouts, timeouts)
496             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
497             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
498
499         def _headerfunction(self, header_line):
500             header_line = header_line.decode('iso-8859-1')
501             if ':' in header_line:
502                 name, value = header_line.split(':', 1)
503                 name = name.strip().lower()
504                 value = value.strip()
505             elif self._headers:
506                 name = self._lastheadername
507                 value = self._headers[name] + ' ' + header_line.strip()
508             elif header_line.startswith('HTTP/'):
509                 name = 'x-status-line'
510                 value = header_line
511             else:
512                 _logger.error("Unexpected header line: %s", header_line)
513                 return
514             self._lastheadername = name
515             self._headers[name] = value
516             # Returning None implies all bytes were written
517
518
519     class KeepWriterThread(threading.Thread):
520         """
521         Write a blob of data to the given Keep server. On success, call
522         save_response() of the given ThreadLimiter to save the returned
523         locator.
524         """
525         def __init__(self, keep_service, **kwargs):
526             super(KeepClient.KeepWriterThread, self).__init__()
527             self.service = keep_service
528             self.args = kwargs
529             self._success = False
530
531         def success(self):
532             return self._success
533
534         def run(self):
535             limiter = self.args['thread_limiter']
536             sequence = self.args['thread_sequence']
537             if sequence is not None:
538                 limiter.set_sequence(sequence)
539             with limiter:
540                 if not limiter.shall_i_proceed():
541                     # My turn arrived, but the job has been done without
542                     # me.
543                     return
544                 self.run_with_limiter(limiter)
545
546         def run_with_limiter(self, limiter):
547             if self.service.finished():
548                 return
549             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
550                           str(threading.current_thread()),
551                           self.args['data_hash'],
552                           len(self.args['data']),
553                           self.args['service_root'])
554             self._success = bool(self.service.put(
555                 self.args['data_hash'],
556                 self.args['data'],
557                 timeout=self.args.get('timeout', None)))
558             result = self.service.last_result()
559             if self._success:
560                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
561                               str(threading.current_thread()),
562                               self.args['data_hash'],
563                               len(self.args['data']),
564                               self.args['service_root'])
565                 # Tick the 'done' counter for the number of replica
566                 # reported stored by the server, for the case that
567                 # we're talking to a proxy or other backend that
568                 # stores to multiple copies for us.
569                 try:
570                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
571                 except (KeyError, ValueError):
572                     replicas_stored = 1
573                 limiter.save_response(result['body'].strip(), replicas_stored)
574             elif result.get('status_code', None):
575                 _logger.debug("Request fail: PUT %s => %s %s",
576                               self.args['data_hash'],
577                               result['status_code'],
578                               result['body'])
579
580
581     def __init__(self, api_client=None, proxy=None,
582                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
583                  api_token=None, local_store=None, block_cache=None,
584                  num_retries=0, session=None):
585         """Initialize a new KeepClient.
586
587         Arguments:
588         :api_client:
589           The API client to use to find Keep services.  If not
590           provided, KeepClient will build one from available Arvados
591           configuration.
592
593         :proxy:
594           If specified, this KeepClient will send requests to this Keep
595           proxy.  Otherwise, KeepClient will fall back to the setting of the
596           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
597           KeepClient does not use a proxy, pass in an empty string.
598
599         :timeout:
600           The initial timeout (in seconds) for HTTP requests to Keep
601           non-proxy servers.  A tuple of two floats is interpreted as
602           (connection_timeout, read_timeout): see
603           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
604           Because timeouts are often a result of transient server load, the
605           actual connection timeout will be increased by a factor of two on
606           each retry.
607           Default: (2, 300).
608
609         :proxy_timeout:
610           The initial timeout (in seconds) for HTTP requests to
611           Keep proxies. A tuple of two floats is interpreted as
612           (connection_timeout, read_timeout). The behavior described
613           above for adjusting connection timeouts on retry also applies.
614           Default: (20, 300).
615
616         :api_token:
617           If you're not using an API client, but only talking
618           directly to a Keep proxy, this parameter specifies an API token
619           to authenticate Keep requests.  It is an error to specify both
620           api_client and api_token.  If you specify neither, KeepClient
621           will use one available from the Arvados configuration.
622
623         :local_store:
624           If specified, this KeepClient will bypass Keep
625           services, and save data to the named directory.  If unspecified,
626           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
627           environment variable.  If you want to ensure KeepClient does not
628           use local storage, pass in an empty string.  This is primarily
629           intended to mock a server for testing.
630
631         :num_retries:
632           The default number of times to retry failed requests.
633           This will be used as the default num_retries value when get() and
634           put() are called.  Default 0.
635         """
636         self.lock = threading.Lock()
637         if proxy is None:
638             proxy = config.get('ARVADOS_KEEP_PROXY')
639         if api_token is None:
640             if api_client is None:
641                 api_token = config.get('ARVADOS_API_TOKEN')
642             else:
643                 api_token = api_client.api_token
644         elif api_client is not None:
645             raise ValueError(
646                 "can't build KeepClient with both API client and token")
647         if local_store is None:
648             local_store = os.environ.get('KEEP_LOCAL_STORE')
649
650         self.block_cache = block_cache if block_cache else KeepBlockCache()
651         self.timeout = timeout
652         self.proxy_timeout = proxy_timeout
653         self._user_agent_pool = Queue.LifoQueue()
654
655         if local_store:
656             self.local_store = local_store
657             self.get = self.local_store_get
658             self.put = self.local_store_put
659         else:
660             self.num_retries = num_retries
661             if proxy:
662                 if not proxy.endswith('/'):
663                     proxy += '/'
664                 self.api_token = api_token
665                 self._gateway_services = {}
666                 self._keep_services = [{
667                     'uuid': 'proxy',
668                     '_service_root': proxy,
669                     }]
670                 self._writable_services = self._keep_services
671                 self.using_proxy = True
672                 self._static_services_list = True
673                 self.max_replicas_per_service = 1
674             else:
675                 # It's important to avoid instantiating an API client
676                 # unless we actually need one, for testing's sake.
677                 if api_client is None:
678                     api_client = arvados.api('v1')
679                 self.api_client = api_client
680                 self.api_token = api_client.api_token
681                 self._gateway_services = {}
682                 self._keep_services = None
683                 self._writable_services = None
684                 self.using_proxy = None
685                 self._static_services_list = False
686                 self.max_replicas_per_service = 1
687
688     def current_timeout(self, attempt_number):
689         """Return the appropriate timeout to use for this client.
690
691         The proxy timeout setting if the backend service is currently a proxy,
692         the regular timeout setting otherwise.  The `attempt_number` indicates
693         how many times the operation has been tried already (starting from 0
694         for the first try), and scales the connection timeout portion of the
695         return value accordingly.
696
697         """
698         # TODO(twp): the timeout should be a property of a
699         # KeepService, not a KeepClient. See #4488.
700         t = self.proxy_timeout if self.using_proxy else self.timeout
701         return (t[0] * (1 << attempt_number), t[1])
702
703     def build_services_list(self, force_rebuild=False):
704         if (self._static_services_list or
705               (self._keep_services and not force_rebuild)):
706             return
707         with self.lock:
708             try:
709                 keep_services = self.api_client.keep_services().accessible()
710             except Exception:  # API server predates Keep services.
711                 keep_services = self.api_client.keep_disks().list()
712
713             accessible = keep_services.execute().get('items')
714             if not accessible:
715                 raise arvados.errors.NoKeepServersError()
716
717             # Precompute the base URI for each service.
718             for r in accessible:
719                 host = r['service_host']
720                 if not host.startswith('[') and host.find(':') >= 0:
721                     # IPv6 URIs must be formatted like http://[::1]:80/...
722                     host = '[' + host + ']'
723                 r['_service_root'] = "{}://{}:{:d}/".format(
724                     'https' if r['service_ssl_flag'] else 'http',
725                     host,
726                     r['service_port'])
727
728             # Gateway services are only used when specified by UUID,
729             # so there's nothing to gain by filtering them by
730             # service_type.
731             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
732             _logger.debug(str(self._gateway_services))
733
734             self._keep_services = [
735                 ks for ks in accessible
736                 if ks.get('service_type') in ['disk', 'proxy']]
737             self._writable_services = [
738                 ks for ks in accessible
739                 if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
740             _logger.debug(str(self._keep_services))
741
742             self.using_proxy = any(ks.get('service_type') == 'proxy'
743                                    for ks in self._keep_services)
744             # For disk type services, max_replicas_per_service is 1
745             # It is unknown or unlimited for non-disk typed services.
746             for ks in accessible:
747                 if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
748                     self.max_replicas_per_service = None
749
750     def _service_weight(self, data_hash, service_uuid):
751         """Compute the weight of a Keep service endpoint for a data
752         block with a known hash.
753
754         The weight is md5(h + u) where u is the last 15 characters of
755         the service endpoint's UUID.
756         """
757         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
758
759     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
760         """Return an array of Keep service endpoints, in the order in
761         which they should be probed when reading or writing data with
762         the given hash+hints.
763         """
764         self.build_services_list(force_rebuild)
765
766         sorted_roots = []
767         # Use the services indicated by the given +K@... remote
768         # service hints, if any are present and can be resolved to a
769         # URI.
770         for hint in locator.hints:
771             if hint.startswith('K@'):
772                 if len(hint) == 7:
773                     sorted_roots.append(
774                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
775                 elif len(hint) == 29:
776                     svc = self._gateway_services.get(hint[2:])
777                     if svc:
778                         sorted_roots.append(svc['_service_root'])
779
780         # Sort the available local services by weight (heaviest first)
781         # for this locator, and return their service_roots (base URIs)
782         # in that order.
783         use_services = self._keep_services
784         if need_writable:
785           use_services = self._writable_services
786         sorted_roots.extend([
787             svc['_service_root'] for svc in sorted(
788                 use_services,
789                 reverse=True,
790                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
791         _logger.debug("{}: {}".format(locator, sorted_roots))
792         return sorted_roots
793
794     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
795         # roots_map is a dictionary, mapping Keep service root strings
796         # to KeepService objects.  Poll for Keep services, and add any
797         # new ones to roots_map.  Return the current list of local
798         # root strings.
799         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
800         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
801         for root in local_roots:
802             if root not in roots_map:
803                 roots_map[root] = self.KeepService(
804                     root, self._user_agent_pool, **headers)
805         return local_roots
806
807     @staticmethod
808     def _check_loop_result(result):
809         # KeepClient RetryLoops should save results as a 2-tuple: the
810         # actual result of the request, and the number of servers available
811         # to receive the request this round.
812         # This method returns True if there's a real result, False if
813         # there are no more servers available, otherwise None.
814         if isinstance(result, Exception):
815             return None
816         result, tried_server_count = result
817         if (result is not None) and (result is not False):
818             return True
819         elif tried_server_count < 1:
820             _logger.info("No more Keep services to try; giving up")
821             return False
822         else:
823             return None
824
825     def get_from_cache(self, loc):
826         """Fetch a block only if is in the cache, otherwise return None."""
827         slot = self.block_cache.get(loc)
828         if slot is not None and slot.ready.is_set():
829             return slot.get()
830         else:
831             return None
832
833     @retry.retry_method
834     def get(self, loc_s, num_retries=None):
835         """Get data from Keep.
836
837         This method fetches one or more blocks of data from Keep.  It
838         sends a request each Keep service registered with the API
839         server (or the proxy provided when this client was
840         instantiated), then each service named in location hints, in
841         sequence.  As soon as one service provides the data, it's
842         returned.
843
844         Arguments:
845         * loc_s: A string of one or more comma-separated locators to fetch.
846           This method returns the concatenation of these blocks.
847         * num_retries: The number of times to retry GET requests to
848           *each* Keep server if it returns temporary failures, with
849           exponential backoff.  Note that, in each loop, the method may try
850           to fetch data from every available Keep service, along with any
851           that are named in location hints in the locator.  The default value
852           is set when the KeepClient is initialized.
853         """
854         if ',' in loc_s:
855             return ''.join(self.get(x) for x in loc_s.split(','))
856         locator = KeepLocator(loc_s)
857         slot, first = self.block_cache.reserve_cache(locator.md5sum)
858         if not first:
859             v = slot.get()
860             return v
861
862         # If the locator has hints specifying a prefix (indicating a
863         # remote keepproxy) or the UUID of a local gateway service,
864         # read data from the indicated service(s) instead of the usual
865         # list of local disk services.
866         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
867                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
868         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
869                            for hint in locator.hints if (
870                                    hint.startswith('K@') and
871                                    len(hint) == 29 and
872                                    self._gateway_services.get(hint[2:])
873                                    )])
874         # Map root URLs to their KeepService objects.
875         roots_map = {
876             root: self.KeepService(root, self._user_agent_pool)
877             for root in hint_roots
878         }
879
880         # See #3147 for a discussion of the loop implementation.  Highlights:
881         # * Refresh the list of Keep services after each failure, in case
882         #   it's being updated.
883         # * Retry until we succeed, we're out of retries, or every available
884         #   service has returned permanent failure.
885         sorted_roots = []
886         roots_map = {}
887         blob = None
888         loop = retry.RetryLoop(num_retries, self._check_loop_result,
889                                backoff_start=2)
890         for tries_left in loop:
891             try:
892                 sorted_roots = self.map_new_services(
893                     roots_map, locator,
894                     force_rebuild=(tries_left < num_retries),
895                     need_writable=False)
896             except Exception as error:
897                 loop.save_result(error)
898                 continue
899
900             # Query KeepService objects that haven't returned
901             # permanent failure, in our specified shuffle order.
902             services_to_try = [roots_map[root]
903                                for root in sorted_roots
904                                if roots_map[root].usable()]
905             for keep_service in services_to_try:
906                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
907                 if blob is not None:
908                     break
909             loop.save_result((blob, len(services_to_try)))
910
911         # Always cache the result, then return it if we succeeded.
912         slot.set(blob)
913         self.block_cache.cap_cache()
914         if loop.success():
915             return blob
916
917         # Q: Including 403 is necessary for the Keep tests to continue
918         # passing, but maybe they should expect KeepReadError instead?
919         not_founds = sum(1 for key in sorted_roots
920                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
921         service_errors = ((key, roots_map[key].last_result()['error'])
922                           for key in sorted_roots)
923         if not roots_map:
924             raise arvados.errors.KeepReadError(
925                 "failed to read {}: no Keep services available ({})".format(
926                     loc_s, loop.last_result()))
927         elif not_founds == len(sorted_roots):
928             raise arvados.errors.NotFoundError(
929                 "{} not found".format(loc_s), service_errors)
930         else:
931             raise arvados.errors.KeepReadError(
932                 "failed to read {}".format(loc_s), service_errors, label="service")
933
934     @retry.retry_method
935     def put(self, data, copies=2, num_retries=None):
936         """Save data in Keep.
937
938         This method will get a list of Keep services from the API server, and
939         send the data to each one simultaneously in a new thread.  Once the
940         uploads are finished, if enough copies are saved, this method returns
941         the most recent HTTP response body.  If requests fail to upload
942         enough copies, this method raises KeepWriteError.
943
944         Arguments:
945         * data: The string of data to upload.
946         * copies: The number of copies that the user requires be saved.
947           Default 2.
948         * num_retries: The number of times to retry PUT requests to
949           *each* Keep server if it returns temporary failures, with
950           exponential backoff.  The default value is set when the
951           KeepClient is initialized.
952         """
953
954         if isinstance(data, unicode):
955             data = data.encode("ascii")
956         elif not isinstance(data, str):
957             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
958
959         data_hash = hashlib.md5(data).hexdigest()
960         loc_s = data_hash + '+' + str(len(data))
961         if copies < 1:
962             return loc_s
963         locator = KeepLocator(loc_s)
964
965         headers = {}
966         # Tell the proxy how many copies we want it to store
967         headers['X-Keep-Desired-Replication'] = str(copies)
968         roots_map = {}
969         thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
970         loop = retry.RetryLoop(num_retries, self._check_loop_result,
971                                backoff_start=2)
972         thread_sequence = 0
973         for tries_left in loop:
974             try:
975                 sorted_roots = self.map_new_services(
976                     roots_map, locator,
977                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
978             except Exception as error:
979                 loop.save_result(error)
980                 continue
981
982             threads = []
983             for service_root, ks in [(root, roots_map[root])
984                                      for root in sorted_roots]:
985                 if ks.finished():
986                     continue
987                 t = KeepClient.KeepWriterThread(
988                     ks,
989                     data=data,
990                     data_hash=data_hash,
991                     service_root=service_root,
992                     thread_limiter=thread_limiter,
993                     timeout=self.current_timeout(num_retries-tries_left),
994                     thread_sequence=thread_sequence)
995                 t.start()
996                 threads.append(t)
997                 thread_sequence += 1
998             for t in threads:
999                 t.join()
1000             loop.save_result((thread_limiter.done() >= copies, len(threads)))
1001
1002         if loop.success():
1003             return thread_limiter.response()
1004         if not roots_map:
1005             raise arvados.errors.KeepWriteError(
1006                 "failed to write {}: no Keep services available ({})".format(
1007                     data_hash, loop.last_result()))
1008         else:
1009             service_errors = ((key, roots_map[key].last_result()['error'])
1010                               for key in sorted_roots
1011                               if roots_map[key].last_result()['error'])
1012             raise arvados.errors.KeepWriteError(
1013                 "failed to write {} (wanted {} copies but wrote {})".format(
1014                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1015
1016     def local_store_put(self, data, copies=1, num_retries=None):
1017         """A stub for put().
1018
1019         This method is used in place of the real put() method when
1020         using local storage (see constructor's local_store argument).
1021
1022         copies and num_retries arguments are ignored: they are here
1023         only for the sake of offering the same call signature as
1024         put().
1025
1026         Data stored this way can be retrieved via local_store_get().
1027         """
1028         md5 = hashlib.md5(data).hexdigest()
1029         locator = '%s+%d' % (md5, len(data))
1030         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1031             f.write(data)
1032         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1033                   os.path.join(self.local_store, md5))
1034         return locator
1035
1036     def local_store_get(self, loc_s, num_retries=None):
1037         """Companion to local_store_put()."""
1038         try:
1039             locator = KeepLocator(loc_s)
1040         except ValueError:
1041             raise arvados.errors.NotFoundError(
1042                 "Invalid data locator: '%s'" % loc_s)
1043         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1044             return ''
1045         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1046             return f.read()
1047
1048     def is_cached(self, locator):
1049         return self.block_cache.reserve_cache(expect_hash)