7161: clarify max_replicas_per_service
[arvados.git] / sdk / python / arvados / keep.py
1 import bz2
2 import datetime
3 import fcntl
4 import functools
5 import gflags
6 import hashlib
7 import json
8 import logging
9 import os
10 import pprint
11 import pycurl
12 import Queue
13 import re
14 import socket
15 import ssl
16 import string
17 import cStringIO
18 import subprocess
19 import sys
20 import threading
21 import time
22 import timer
23 import types
24 import UserDict
25 import zlib
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.retry as retry
31 import arvados.util
32
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
35
36
37 class KeepLocator(object):
38     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
40
41     def __init__(self, locator_str):
42         self.hints = []
43         self._perm_sig = None
44         self._perm_expiry = None
45         pieces = iter(locator_str.split('+'))
46         self.md5sum = next(pieces)
47         try:
48             self.size = int(next(pieces))
49         except StopIteration:
50             self.size = None
51         for hint in pieces:
52             if self.HINT_RE.match(hint) is None:
53                 raise ValueError("invalid hint format: {}".format(hint))
54             elif hint.startswith('A'):
55                 self.parse_permission_hint(hint)
56             else:
57                 self.hints.append(hint)
58
59     def __str__(self):
60         return '+'.join(
61             str(s) for s in [self.md5sum, self.size,
62                              self.permission_hint()] + self.hints
63             if s is not None)
64
65     def stripped(self):
66         if self.size is not None:
67             return "%s+%i" % (self.md5sum, self.size)
68         else:
69             return self.md5sum
70
71     def _make_hex_prop(name, length):
72         # Build and return a new property with the given name that
73         # must be a hex string of the given length.
74         data_name = '_{}'.format(name)
75         def getter(self):
76             return getattr(self, data_name)
77         def setter(self, hex_str):
78             if not arvados.util.is_hex(hex_str, length):
79                 raise ValueError("{} is not a {}-digit hex string: {}".
80                                  format(name, length, hex_str))
81             setattr(self, data_name, hex_str)
82         return property(getter, setter)
83
84     md5sum = _make_hex_prop('md5sum', 32)
85     perm_sig = _make_hex_prop('perm_sig', 40)
86
87     @property
88     def perm_expiry(self):
89         return self._perm_expiry
90
91     @perm_expiry.setter
92     def perm_expiry(self, value):
93         if not arvados.util.is_hex(value, 1, 8):
94             raise ValueError(
95                 "permission timestamp must be a hex Unix timestamp: {}".
96                 format(value))
97         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
98
99     def permission_hint(self):
100         data = [self.perm_sig, self.perm_expiry]
101         if None in data:
102             return None
103         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104         return "A{}@{:08x}".format(*data)
105
106     def parse_permission_hint(self, s):
107         try:
108             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
109         except IndexError:
110             raise ValueError("bad permission hint {}".format(s))
111
112     def permission_expired(self, as_of_dt=None):
113         if self.perm_expiry is None:
114             return False
115         elif as_of_dt is None:
116             as_of_dt = datetime.datetime.now()
117         return self.perm_expiry <= as_of_dt
118
119
120 class Keep(object):
121     """Simple interface to a global KeepClient object.
122
123     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
124     own API client.  The global KeepClient will build an API client from the
125     current Arvados configuration, which may not match the one you built.
126     """
127     _last_key = None
128
129     @classmethod
130     def global_client_object(cls):
131         global global_client_object
132         # Previously, KeepClient would change its behavior at runtime based
133         # on these configuration settings.  We simulate that behavior here
134         # by checking the values and returning a new KeepClient if any of
135         # them have changed.
136         key = (config.get('ARVADOS_API_HOST'),
137                config.get('ARVADOS_API_TOKEN'),
138                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139                config.get('ARVADOS_KEEP_PROXY'),
140                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141                os.environ.get('KEEP_LOCAL_STORE'))
142         if (global_client_object is None) or (cls._last_key != key):
143             global_client_object = KeepClient()
144             cls._last_key = key
145         return global_client_object
146
147     @staticmethod
148     def get(locator, **kwargs):
149         return Keep.global_client_object().get(locator, **kwargs)
150
151     @staticmethod
152     def put(data, **kwargs):
153         return Keep.global_client_object().put(data, **kwargs)
154
155 class KeepBlockCache(object):
156     # Default RAM cache is 256MiB
157     def __init__(self, cache_max=(256 * 1024 * 1024)):
158         self.cache_max = cache_max
159         self._cache = []
160         self._cache_lock = threading.Lock()
161
162     class CacheSlot(object):
163         def __init__(self, locator):
164             self.locator = locator
165             self.ready = threading.Event()
166             self.content = None
167
168         def get(self):
169             self.ready.wait()
170             return self.content
171
172         def set(self, value):
173             self.content = value
174             self.ready.set()
175
176         def size(self):
177             if self.content is None:
178                 return 0
179             else:
180                 return len(self.content)
181
182     def cap_cache(self):
183         '''Cap the cache size to self.cache_max'''
184         with self._cache_lock:
185             # Select all slots except those where ready.is_set() and content is
186             # None (that means there was an error reading the block).
187             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188             sm = sum([slot.size() for slot in self._cache])
189             while len(self._cache) > 0 and sm > self.cache_max:
190                 for i in xrange(len(self._cache)-1, -1, -1):
191                     if self._cache[i].ready.is_set():
192                         del self._cache[i]
193                         break
194                 sm = sum([slot.size() for slot in self._cache])
195
196     def _get(self, locator):
197         # Test if the locator is already in the cache
198         for i in xrange(0, len(self._cache)):
199             if self._cache[i].locator == locator:
200                 n = self._cache[i]
201                 if i != 0:
202                     # move it to the front
203                     del self._cache[i]
204                     self._cache.insert(0, n)
205                 return n
206         return None
207
208     def get(self, locator):
209         with self._cache_lock:
210             return self._get(locator)
211
212     def reserve_cache(self, locator):
213         '''Reserve a cache slot for the specified locator,
214         or return the existing slot.'''
215         with self._cache_lock:
216             n = self._get(locator)
217             if n:
218                 return n, False
219             else:
220                 # Add a new cache slot for the locator
221                 n = KeepBlockCache.CacheSlot(locator)
222                 self._cache.insert(0, n)
223                 return n, True
224
225 class KeepClient(object):
226
227     # Default Keep server connection timeout:  2 seconds
228     # Default Keep server read timeout:      300 seconds
229     # Default Keep proxy connection timeout:  20 seconds
230     # Default Keep proxy read timeout:       300 seconds
231     DEFAULT_TIMEOUT = (2, 300)
232     DEFAULT_PROXY_TIMEOUT = (20, 300)
233
234     class ThreadLimiter(object):
235         """
236         Limit the number of threads running at a given time to
237         {desired successes} minus {successes reported}. When successes
238         reported == desired, wake up the remaining threads and tell
239         them to quit.
240
241         Should be used in a "with" block.
242         """
243         def __init__(self, todo):
244             self._todo = todo
245             self._done = 0
246             self._response = None
247             self._todo_lock = threading.Semaphore(todo)
248             self._done_lock = threading.Lock()
249
250         def __enter__(self):
251             self._todo_lock.acquire()
252             return self
253
254         def __exit__(self, type, value, traceback):
255             self._todo_lock.release()
256
257         def shall_i_proceed(self):
258             """
259             Return true if the current thread should do stuff. Return
260             false if the current thread should just stop.
261             """
262             with self._done_lock:
263                 return (self._done < self._todo)
264
265         def save_response(self, response_body, replicas_stored):
266             """
267             Records a response body (a locator, possibly signed) returned by
268             the Keep server.  It is not necessary to save more than
269             one response, since we presume that any locator returned
270             in response to a successful request is valid.
271             """
272             with self._done_lock:
273                 self._done += replicas_stored
274                 self._response = response_body
275
276         def response(self):
277             """
278             Returns the body from the response to a PUT request.
279             """
280             with self._done_lock:
281                 return self._response
282
283         def done(self):
284             """
285             Return how many successes were reported.
286             """
287             with self._done_lock:
288                 return self._done
289
290
291     class KeepService(object):
292         """Make requests to a single Keep service, and track results.
293
294         A KeepService is intended to last long enough to perform one
295         transaction (GET or PUT) against one Keep service. This can
296         involve calling either get() or put() multiple times in order
297         to retry after transient failures. However, calling both get()
298         and put() on a single instance -- or using the same instance
299         to access two different Keep services -- will not produce
300         sensible behavior.
301         """
302
303         HTTP_ERRORS = (
304             socket.error,
305             ssl.SSLError,
306             arvados.errors.HttpError,
307         )
308
309         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
310             self.root = root
311             self._user_agent_pool = user_agent_pool
312             self._result = {'error': None}
313             self._usable = True
314             self._session = None
315             self.get_headers = {'Accept': 'application/octet-stream'}
316             self.get_headers.update(headers)
317             self.put_headers = headers
318
319         def usable(self):
320             """Is it worth attempting a request?"""
321             return self._usable
322
323         def finished(self):
324             """Did the request succeed or encounter permanent failure?"""
325             return self._result['error'] == False or not self._usable
326
327         def last_result(self):
328             return self._result
329
330         def _get_user_agent(self):
331             try:
332                 return self._user_agent_pool.get(False)
333             except Queue.Empty:
334                 return pycurl.Curl()
335
336         def _put_user_agent(self, ua):
337             try:
338                 ua.reset()
339                 self._user_agent_pool.put(ua, False)
340             except:
341                 ua.close()
342
343         @staticmethod
344         def _socket_open(family, socktype, protocol, address=None):
345             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
346             s = socket.socket(family, socktype, protocol)
347             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
348             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
349             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
350             return s
351
352         def get(self, locator, timeout=None):
353             # locator is a KeepLocator object.
354             url = self.root + str(locator)
355             _logger.debug("Request: GET %s", url)
356             curl = self._get_user_agent()
357             try:
358                 with timer.Timer() as t:
359                     self._headers = {}
360                     response_body = cStringIO.StringIO()
361                     curl.setopt(pycurl.NOSIGNAL, 1)
362                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
363                     curl.setopt(pycurl.URL, url.encode('utf-8'))
364                     curl.setopt(pycurl.HTTPHEADER, [
365                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
366                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
367                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
368                     self._setcurltimeouts(curl, timeout)
369                     try:
370                         curl.perform()
371                     except Exception as e:
372                         raise arvados.errors.HttpError(0, str(e))
373                     self._result = {
374                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
375                         'body': response_body.getvalue(),
376                         'headers': self._headers,
377                         'error': False,
378                     }
379                 ok = retry.check_http_response_success(self._result['status_code'])
380                 if not ok:
381                     self._result['error'] = arvados.errors.HttpError(
382                         self._result['status_code'],
383                         self._headers.get('x-status-line', 'Error'))
384             except self.HTTP_ERRORS as e:
385                 self._result = {
386                     'error': e,
387                 }
388                 ok = False
389             self._usable = ok != False
390             if self._result.get('status_code', None):
391                 # The client worked well enough to get an HTTP status
392                 # code, so presumably any problems are just on the
393                 # server side and it's OK to reuse the client.
394                 self._put_user_agent(curl)
395             else:
396                 # Don't return this client to the pool, in case it's
397                 # broken.
398                 curl.close()
399             if not ok:
400                 _logger.debug("Request fail: GET %s => %s: %s",
401                               url, type(self._result['error']), str(self._result['error']))
402                 return None
403             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
404                          self._result['status_code'],
405                          len(self._result['body']),
406                          t.msecs,
407                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
408             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
409             if resp_md5 != locator.md5sum:
410                 _logger.warning("Checksum fail: md5(%s) = %s",
411                                 url, resp_md5)
412                 self._result['error'] = arvados.errors.HttpError(
413                     0, 'Checksum fail')
414                 return None
415             return self._result['body']
416
417         def put(self, hash_s, body, timeout=None):
418             url = self.root + hash_s
419             _logger.debug("Request: PUT %s", url)
420             curl = self._get_user_agent()
421             try:
422                 self._headers = {}
423                 body_reader = cStringIO.StringIO(body)
424                 response_body = cStringIO.StringIO()
425                 curl.setopt(pycurl.NOSIGNAL, 1)
426                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
427                 curl.setopt(pycurl.URL, url.encode('utf-8'))
428                 # Using UPLOAD tells cURL to wait for a "go ahead" from the
429                 # Keep server (in the form of a HTTP/1.1 "100 Continue"
430                 # response) instead of sending the request body immediately.
431                 # This allows the server to reject the request if the request
432                 # is invalid or the server is read-only, without waiting for
433                 # the client to send the entire block.
434                 curl.setopt(pycurl.UPLOAD, True)
435                 curl.setopt(pycurl.INFILESIZE, len(body))
436                 curl.setopt(pycurl.READFUNCTION, body_reader.read)
437                 curl.setopt(pycurl.HTTPHEADER, [
438                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
439                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
440                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
441                 self._setcurltimeouts(curl, timeout)
442                 try:
443                     curl.perform()
444                 except Exception as e:
445                     raise arvados.errors.HttpError(0, str(e))
446                 self._result = {
447                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
448                     'body': response_body.getvalue(),
449                     'headers': self._headers,
450                     'error': False,
451                 }
452                 ok = retry.check_http_response_success(self._result['status_code'])
453                 if not ok:
454                     self._result['error'] = arvados.errors.HttpError(
455                         self._result['status_code'],
456                         self._headers.get('x-status-line', 'Error'))
457             except self.HTTP_ERRORS as e:
458                 self._result = {
459                     'error': e,
460                 }
461                 ok = False
462             self._usable = ok != False # still usable if ok is True or None
463             if self._result.get('status_code', None):
464                 # Client is functional. See comment in get().
465                 self._put_user_agent(curl)
466             else:
467                 curl.close()
468             if not ok:
469                 _logger.debug("Request fail: PUT %s => %s: %s",
470                               url, type(self._result['error']), str(self._result['error']))
471                 return False
472             return True
473
474         def _setcurltimeouts(self, curl, timeouts):
475             if not timeouts:
476                 return
477             elif isinstance(timeouts, tuple):
478                 conn_t, xfer_t = timeouts
479             else:
480                 conn_t, xfer_t = (timeouts, timeouts)
481             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
482             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
483
484         def _headerfunction(self, header_line):
485             header_line = header_line.decode('iso-8859-1')
486             if ':' in header_line:
487                 name, value = header_line.split(':', 1)
488                 name = name.strip().lower()
489                 value = value.strip()
490             elif self._headers:
491                 name = self._lastheadername
492                 value = self._headers[name] + ' ' + header_line.strip()
493             elif header_line.startswith('HTTP/'):
494                 name = 'x-status-line'
495                 value = header_line
496             else:
497                 _logger.error("Unexpected header line: %s", header_line)
498                 return
499             self._lastheadername = name
500             self._headers[name] = value
501             # Returning None implies all bytes were written
502
503
504     class KeepWriterThread(threading.Thread):
505         """
506         Write a blob of data to the given Keep server. On success, call
507         save_response() of the given ThreadLimiter to save the returned
508         locator.
509         """
510         def __init__(self, keep_service, **kwargs):
511             super(KeepClient.KeepWriterThread, self).__init__()
512             self.service = keep_service
513             self.args = kwargs
514             self._success = False
515
516         def success(self):
517             return self._success
518
519         def run(self):
520             with self.args['thread_limiter'] as limiter:
521                 if not limiter.shall_i_proceed():
522                     # My turn arrived, but the job has been done without
523                     # me.
524                     return
525                 self.run_with_limiter(limiter)
526
527         def run_with_limiter(self, limiter):
528             if self.service.finished():
529                 return
530             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
531                           str(threading.current_thread()),
532                           self.args['data_hash'],
533                           len(self.args['data']),
534                           self.args['service_root'])
535             self._success = bool(self.service.put(
536                 self.args['data_hash'],
537                 self.args['data'],
538                 timeout=self.args.get('timeout', None)))
539             result = self.service.last_result()
540             if self._success:
541                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
542                               str(threading.current_thread()),
543                               self.args['data_hash'],
544                               len(self.args['data']),
545                               self.args['service_root'])
546                 # Tick the 'done' counter for the number of replica
547                 # reported stored by the server, for the case that
548                 # we're talking to a proxy or other backend that
549                 # stores to multiple copies for us.
550                 try:
551                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
552                 except (KeyError, ValueError):
553                     replicas_stored = 1
554                 limiter.save_response(result['body'].strip(), replicas_stored)
555             elif result.get('status_code', None):
556                 _logger.debug("Request fail: PUT %s => %s %s",
557                               self.args['data_hash'],
558                               result['status_code'],
559                               result['body'])
560
561
562     def __init__(self, api_client=None, proxy=None,
563                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
564                  api_token=None, local_store=None, block_cache=None,
565                  num_retries=0, session=None):
566         """Initialize a new KeepClient.
567
568         Arguments:
569         :api_client:
570           The API client to use to find Keep services.  If not
571           provided, KeepClient will build one from available Arvados
572           configuration.
573
574         :proxy:
575           If specified, this KeepClient will send requests to this Keep
576           proxy.  Otherwise, KeepClient will fall back to the setting of the
577           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
578           KeepClient does not use a proxy, pass in an empty string.
579
580         :timeout:
581           The initial timeout (in seconds) for HTTP requests to Keep
582           non-proxy servers.  A tuple of two floats is interpreted as
583           (connection_timeout, read_timeout): see
584           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
585           Because timeouts are often a result of transient server load, the
586           actual connection timeout will be increased by a factor of two on
587           each retry.
588           Default: (2, 300).
589
590         :proxy_timeout:
591           The initial timeout (in seconds) for HTTP requests to
592           Keep proxies. A tuple of two floats is interpreted as
593           (connection_timeout, read_timeout). The behavior described
594           above for adjusting connection timeouts on retry also applies.
595           Default: (20, 300).
596
597         :api_token:
598           If you're not using an API client, but only talking
599           directly to a Keep proxy, this parameter specifies an API token
600           to authenticate Keep requests.  It is an error to specify both
601           api_client and api_token.  If you specify neither, KeepClient
602           will use one available from the Arvados configuration.
603
604         :local_store:
605           If specified, this KeepClient will bypass Keep
606           services, and save data to the named directory.  If unspecified,
607           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
608           environment variable.  If you want to ensure KeepClient does not
609           use local storage, pass in an empty string.  This is primarily
610           intended to mock a server for testing.
611
612         :num_retries:
613           The default number of times to retry failed requests.
614           This will be used as the default num_retries value when get() and
615           put() are called.  Default 0.
616         """
617         self.lock = threading.Lock()
618         if proxy is None:
619             proxy = config.get('ARVADOS_KEEP_PROXY')
620         if api_token is None:
621             if api_client is None:
622                 api_token = config.get('ARVADOS_API_TOKEN')
623             else:
624                 api_token = api_client.api_token
625         elif api_client is not None:
626             raise ValueError(
627                 "can't build KeepClient with both API client and token")
628         if local_store is None:
629             local_store = os.environ.get('KEEP_LOCAL_STORE')
630
631         self.block_cache = block_cache if block_cache else KeepBlockCache()
632         self.timeout = timeout
633         self.proxy_timeout = proxy_timeout
634         self._user_agent_pool = Queue.LifoQueue()
635
636         if local_store:
637             self.local_store = local_store
638             self.get = self.local_store_get
639             self.put = self.local_store_put
640         else:
641             self.num_retries = num_retries
642             if proxy:
643                 if not proxy.endswith('/'):
644                     proxy += '/'
645                 self.api_token = api_token
646                 self._gateway_services = {}
647                 self._keep_services = [{
648                     'uuid': 'proxy',
649                     '_service_root': proxy,
650                     }]
651                 self._writable_services = self._keep_services
652                 self.using_proxy = True
653                 self._static_services_list = True
654                 self.max_replicas_per_service = None
655             else:
656                 # It's important to avoid instantiating an API client
657                 # unless we actually need one, for testing's sake.
658                 if api_client is None:
659                     api_client = arvados.api('v1')
660                 self.api_client = api_client
661                 self.api_token = api_client.api_token
662                 self._gateway_services = {}
663                 self._keep_services = None
664                 self._writable_services = None
665                 self.using_proxy = None
666                 self._static_services_list = False
667                 self.max_replicas_per_service = None
668
669     def current_timeout(self, attempt_number):
670         """Return the appropriate timeout to use for this client.
671
672         The proxy timeout setting if the backend service is currently a proxy,
673         the regular timeout setting otherwise.  The `attempt_number` indicates
674         how many times the operation has been tried already (starting from 0
675         for the first try), and scales the connection timeout portion of the
676         return value accordingly.
677
678         """
679         # TODO(twp): the timeout should be a property of a
680         # KeepService, not a KeepClient. See #4488.
681         t = self.proxy_timeout if self.using_proxy else self.timeout
682         return (t[0] * (1 << attempt_number), t[1])
683
684     def build_services_list(self, force_rebuild=False):
685         if (self._static_services_list or
686               (self._keep_services and not force_rebuild)):
687             return
688         with self.lock:
689             try:
690                 keep_services = self.api_client.keep_services().accessible()
691             except Exception:  # API server predates Keep services.
692                 keep_services = self.api_client.keep_disks().list()
693
694             accessible = keep_services.execute().get('items')
695             if not accessible:
696                 raise arvados.errors.NoKeepServersError()
697
698             # Precompute the base URI for each service.
699             for r in accessible:
700                 host = r['service_host']
701                 if not host.startswith('[') and host.find(':') >= 0:
702                     # IPv6 URIs must be formatted like http://[::1]:80/...
703                     host = '[' + host + ']'
704                 r['_service_root'] = "{}://{}:{:d}/".format(
705                     'https' if r['service_ssl_flag'] else 'http',
706                     host,
707                     r['service_port'])
708
709             # Gateway services are only used when specified by UUID,
710             # so there's nothing to gain by filtering them by
711             # service_type.
712             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
713             _logger.debug(str(self._gateway_services))
714
715             self._keep_services = [
716                 ks for ks in accessible
717                 if ks.get('service_type') in ['disk', 'proxy']]
718             self._writable_services = [
719                 ks for ks in accessible
720                 if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
721             _logger.debug(str(self._keep_services))
722
723             self.using_proxy = any(ks.get('service_type') == 'proxy'
724                                    for ks in self._keep_services)
725             # Set max_replicas_per_service to 1 for disk typed services.
726             # In case of, non-disk typed services, we will use None to mean unknown.
727             self.max_replicas_per_service = 1
728             for ks in accessible:
729                 if ('disk' != ks.get('service_type')) and not ks.get('read_only'):
730                     self.max_replicas_per_service = None
731
732     def _service_weight(self, data_hash, service_uuid):
733         """Compute the weight of a Keep service endpoint for a data
734         block with a known hash.
735
736         The weight is md5(h + u) where u is the last 15 characters of
737         the service endpoint's UUID.
738         """
739         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
740
741     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
742         """Return an array of Keep service endpoints, in the order in
743         which they should be probed when reading or writing data with
744         the given hash+hints.
745         """
746         self.build_services_list(force_rebuild)
747
748         sorted_roots = []
749         # Use the services indicated by the given +K@... remote
750         # service hints, if any are present and can be resolved to a
751         # URI.
752         for hint in locator.hints:
753             if hint.startswith('K@'):
754                 if len(hint) == 7:
755                      sorted_roots.append(
756                          "https://keep.{}.arvadosapi.com/".format(hint[2:]))
757                 elif len(hint) == 29:
758                     svc = self._gateway_services.get(hint[2:])
759                     if svc:
760                         sorted_roots.append(svc['_service_root'])
761
762         # Sort the available local services by weight (heaviest first)
763         # for this locator, and return their service_roots (base URIs)
764         # in that order.
765         use_services = self._keep_services
766         if need_writable:
767           use_services = self._writable_services
768         sorted_roots.extend([
769             svc['_service_root'] for svc in sorted(
770                 use_services,
771                 reverse=True,
772                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
773         _logger.debug("{}: {}".format(locator, sorted_roots))
774         return sorted_roots
775
776     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
777         # roots_map is a dictionary, mapping Keep service root strings
778         # to KeepService objects.  Poll for Keep services, and add any
779         # new ones to roots_map.  Return the current list of local
780         # root strings.
781         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
782         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
783         for root in local_roots:
784             if root not in roots_map:
785                 roots_map[root] = self.KeepService(
786                     root, self._user_agent_pool, **headers)
787         return local_roots
788
789     @staticmethod
790     def _check_loop_result(result):
791         # KeepClient RetryLoops should save results as a 2-tuple: the
792         # actual result of the request, and the number of servers available
793         # to receive the request this round.
794         # This method returns True if there's a real result, False if
795         # there are no more servers available, otherwise None.
796         if isinstance(result, Exception):
797             return None
798         result, tried_server_count = result
799         if (result is not None) and (result is not False):
800             return True
801         elif tried_server_count < 1:
802             _logger.info("No more Keep services to try; giving up")
803             return False
804         else:
805             return None
806
807     def get_from_cache(self, loc):
808         """Fetch a block only if is in the cache, otherwise return None."""
809         slot = self.block_cache.get(loc)
810         if slot is not None and slot.ready.is_set():
811             return slot.get()
812         else:
813             return None
814
815     @retry.retry_method
816     def get(self, loc_s, num_retries=None):
817         """Get data from Keep.
818
819         This method fetches one or more blocks of data from Keep.  It
820         sends a request each Keep service registered with the API
821         server (or the proxy provided when this client was
822         instantiated), then each service named in location hints, in
823         sequence.  As soon as one service provides the data, it's
824         returned.
825
826         Arguments:
827         * loc_s: A string of one or more comma-separated locators to fetch.
828           This method returns the concatenation of these blocks.
829         * num_retries: The number of times to retry GET requests to
830           *each* Keep server if it returns temporary failures, with
831           exponential backoff.  Note that, in each loop, the method may try
832           to fetch data from every available Keep service, along with any
833           that are named in location hints in the locator.  The default value
834           is set when the KeepClient is initialized.
835         """
836         if ',' in loc_s:
837             return ''.join(self.get(x) for x in loc_s.split(','))
838         locator = KeepLocator(loc_s)
839         slot, first = self.block_cache.reserve_cache(locator.md5sum)
840         if not first:
841             v = slot.get()
842             return v
843
844         # If the locator has hints specifying a prefix (indicating a
845         # remote keepproxy) or the UUID of a local gateway service,
846         # read data from the indicated service(s) instead of the usual
847         # list of local disk services.
848         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
849                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
850         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
851                            for hint in locator.hints if (
852                                    hint.startswith('K@') and
853                                    len(hint) == 29 and
854                                    self._gateway_services.get(hint[2:])
855                                    )])
856         # Map root URLs to their KeepService objects.
857         roots_map = {
858             root: self.KeepService(root, self._user_agent_pool)
859             for root in hint_roots
860         }
861
862         # See #3147 for a discussion of the loop implementation.  Highlights:
863         # * Refresh the list of Keep services after each failure, in case
864         #   it's being updated.
865         # * Retry until we succeed, we're out of retries, or every available
866         #   service has returned permanent failure.
867         sorted_roots = []
868         roots_map = {}
869         blob = None
870         loop = retry.RetryLoop(num_retries, self._check_loop_result,
871                                backoff_start=2)
872         for tries_left in loop:
873             try:
874                 sorted_roots = self.map_new_services(
875                     roots_map, locator,
876                     force_rebuild=(tries_left < num_retries),
877                     need_writable=False)
878             except Exception as error:
879                 loop.save_result(error)
880                 continue
881
882             # Query KeepService objects that haven't returned
883             # permanent failure, in our specified shuffle order.
884             services_to_try = [roots_map[root]
885                                for root in sorted_roots
886                                if roots_map[root].usable()]
887             for keep_service in services_to_try:
888                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
889                 if blob is not None:
890                     break
891             loop.save_result((blob, len(services_to_try)))
892
893         # Always cache the result, then return it if we succeeded.
894         slot.set(blob)
895         self.block_cache.cap_cache()
896         if loop.success():
897             return blob
898
899         # Q: Including 403 is necessary for the Keep tests to continue
900         # passing, but maybe they should expect KeepReadError instead?
901         not_founds = sum(1 for key in sorted_roots
902                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
903         service_errors = ((key, roots_map[key].last_result()['error'])
904                           for key in sorted_roots)
905         if not roots_map:
906             raise arvados.errors.KeepReadError(
907                 "failed to read {}: no Keep services available ({})".format(
908                     loc_s, loop.last_result()))
909         elif not_founds == len(sorted_roots):
910             raise arvados.errors.NotFoundError(
911                 "{} not found".format(loc_s), service_errors)
912         else:
913             raise arvados.errors.KeepReadError(
914                 "failed to read {}".format(loc_s), service_errors, label="service")
915
916     @retry.retry_method
917     def put(self, data, copies=2, num_retries=None):
918         """Save data in Keep.
919
920         This method will get a list of Keep services from the API server, and
921         send the data to each one simultaneously in a new thread.  Once the
922         uploads are finished, if enough copies are saved, this method returns
923         the most recent HTTP response body.  If requests fail to upload
924         enough copies, this method raises KeepWriteError.
925
926         Arguments:
927         * data: The string of data to upload.
928         * copies: The number of copies that the user requires be saved.
929           Default 2.
930         * num_retries: The number of times to retry PUT requests to
931           *each* Keep server if it returns temporary failures, with
932           exponential backoff.  The default value is set when the
933           KeepClient is initialized.
934         """
935
936         if isinstance(data, unicode):
937             data = data.encode("ascii")
938         elif not isinstance(data, str):
939             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
940
941         data_hash = hashlib.md5(data).hexdigest()
942         loc_s = data_hash + '+' + str(len(data))
943         if copies < 1:
944             return loc_s
945         locator = KeepLocator(loc_s)
946
947         headers = {}
948         # Tell the proxy how many copies we want it to store
949         headers['X-Keep-Desired-Replication'] = str(copies)
950         roots_map = {}
951         thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
952         loop = retry.RetryLoop(num_retries, self._check_loop_result,
953                                backoff_start=2)
954         for tries_left in loop:
955             try:
956                 local_roots = self.map_new_services(
957                     roots_map, locator,
958                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
959             except Exception as error:
960                 loop.save_result(error)
961                 continue
962
963             threads = []
964             for service_root, ks in roots_map.iteritems():
965                 if ks.finished():
966                     continue
967                 t = KeepClient.KeepWriterThread(
968                     ks,
969                     data=data,
970                     data_hash=data_hash,
971                     service_root=service_root,
972                     thread_limiter=thread_limiter,
973                     timeout=self.current_timeout(num_retries-tries_left))
974                 t.start()
975                 threads.append(t)
976             for t in threads:
977                 t.join()
978             loop.save_result((thread_limiter.done() >= copies, len(threads)))
979
980         if loop.success():
981             return thread_limiter.response()
982         if not roots_map:
983             raise arvados.errors.KeepWriteError(
984                 "failed to write {}: no Keep services available ({})".format(
985                     data_hash, loop.last_result()))
986         else:
987             service_errors = ((key, roots_map[key].last_result()['error'])
988                               for key in local_roots
989                               if roots_map[key].last_result()['error'])
990             raise arvados.errors.KeepWriteError(
991                 "failed to write {} (wanted {} copies but wrote {})".format(
992                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
993
994     def local_store_put(self, data, copies=1, num_retries=None):
995         """A stub for put().
996
997         This method is used in place of the real put() method when
998         using local storage (see constructor's local_store argument).
999
1000         copies and num_retries arguments are ignored: they are here
1001         only for the sake of offering the same call signature as
1002         put().
1003
1004         Data stored this way can be retrieved via local_store_get().
1005         """
1006         md5 = hashlib.md5(data).hexdigest()
1007         locator = '%s+%d' % (md5, len(data))
1008         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1009             f.write(data)
1010         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1011                   os.path.join(self.local_store, md5))
1012         return locator
1013
1014     def local_store_get(self, loc_s, num_retries=None):
1015         """Companion to local_store_put()."""
1016         try:
1017             locator = KeepLocator(loc_s)
1018         except ValueError:
1019             raise arvados.errors.NotFoundError(
1020                 "Invalid data locator: '%s'" % loc_s)
1021         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1022             return ''
1023         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1024             return f.read()
1025
1026     def is_cached(self, locator):
1027         return self.block_cache.reserve_cache(expect_hash)