Merge branch '7696-pysdk-all-keep-service-types-wip'
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import threading
13 import timer
14
15 import arvados
16 import arvados.config as config
17 import arvados.errors
18 import arvados.retry as retry
19 import arvados.util
20
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
23
24
25 class KeepLocator(object):
26     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28
29     def __init__(self, locator_str):
30         self.hints = []
31         self._perm_sig = None
32         self._perm_expiry = None
33         pieces = iter(locator_str.split('+'))
34         self.md5sum = next(pieces)
35         try:
36             self.size = int(next(pieces))
37         except StopIteration:
38             self.size = None
39         for hint in pieces:
40             if self.HINT_RE.match(hint) is None:
41                 raise ValueError("invalid hint format: {}".format(hint))
42             elif hint.startswith('A'):
43                 self.parse_permission_hint(hint)
44             else:
45                 self.hints.append(hint)
46
47     def __str__(self):
48         return '+'.join(
49             str(s) for s in [self.md5sum, self.size,
50                              self.permission_hint()] + self.hints
51             if s is not None)
52
53     def stripped(self):
54         if self.size is not None:
55             return "%s+%i" % (self.md5sum, self.size)
56         else:
57             return self.md5sum
58
59     def _make_hex_prop(name, length):
60         # Build and return a new property with the given name that
61         # must be a hex string of the given length.
62         data_name = '_{}'.format(name)
63         def getter(self):
64             return getattr(self, data_name)
65         def setter(self, hex_str):
66             if not arvados.util.is_hex(hex_str, length):
67                 raise ValueError("{} is not a {}-digit hex string: {}".
68                                  format(name, length, hex_str))
69             setattr(self, data_name, hex_str)
70         return property(getter, setter)
71
72     md5sum = _make_hex_prop('md5sum', 32)
73     perm_sig = _make_hex_prop('perm_sig', 40)
74
75     @property
76     def perm_expiry(self):
77         return self._perm_expiry
78
79     @perm_expiry.setter
80     def perm_expiry(self, value):
81         if not arvados.util.is_hex(value, 1, 8):
82             raise ValueError(
83                 "permission timestamp must be a hex Unix timestamp: {}".
84                 format(value))
85         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86
87     def permission_hint(self):
88         data = [self.perm_sig, self.perm_expiry]
89         if None in data:
90             return None
91         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92         return "A{}@{:08x}".format(*data)
93
94     def parse_permission_hint(self, s):
95         try:
96             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97         except IndexError:
98             raise ValueError("bad permission hint {}".format(s))
99
100     def permission_expired(self, as_of_dt=None):
101         if self.perm_expiry is None:
102             return False
103         elif as_of_dt is None:
104             as_of_dt = datetime.datetime.now()
105         return self.perm_expiry <= as_of_dt
106
107
108 class Keep(object):
109     """Simple interface to a global KeepClient object.
110
111     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
112     own API client.  The global KeepClient will build an API client from the
113     current Arvados configuration, which may not match the one you built.
114     """
115     _last_key = None
116
117     @classmethod
118     def global_client_object(cls):
119         global global_client_object
120         # Previously, KeepClient would change its behavior at runtime based
121         # on these configuration settings.  We simulate that behavior here
122         # by checking the values and returning a new KeepClient if any of
123         # them have changed.
124         key = (config.get('ARVADOS_API_HOST'),
125                config.get('ARVADOS_API_TOKEN'),
126                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127                config.get('ARVADOS_KEEP_PROXY'),
128                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129                os.environ.get('KEEP_LOCAL_STORE'))
130         if (global_client_object is None) or (cls._last_key != key):
131             global_client_object = KeepClient()
132             cls._last_key = key
133         return global_client_object
134
135     @staticmethod
136     def get(locator, **kwargs):
137         return Keep.global_client_object().get(locator, **kwargs)
138
139     @staticmethod
140     def put(data, **kwargs):
141         return Keep.global_client_object().put(data, **kwargs)
142
143 class KeepBlockCache(object):
144     # Default RAM cache is 256MiB
145     def __init__(self, cache_max=(256 * 1024 * 1024)):
146         self.cache_max = cache_max
147         self._cache = []
148         self._cache_lock = threading.Lock()
149
150     class CacheSlot(object):
151         def __init__(self, locator):
152             self.locator = locator
153             self.ready = threading.Event()
154             self.content = None
155
156         def get(self):
157             self.ready.wait()
158             return self.content
159
160         def set(self, value):
161             self.content = value
162             self.ready.set()
163
164         def size(self):
165             if self.content is None:
166                 return 0
167             else:
168                 return len(self.content)
169
170     def cap_cache(self):
171         '''Cap the cache size to self.cache_max'''
172         with self._cache_lock:
173             # Select all slots except those where ready.is_set() and content is
174             # None (that means there was an error reading the block).
175             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
176             sm = sum([slot.size() for slot in self._cache])
177             while len(self._cache) > 0 and sm > self.cache_max:
178                 for i in xrange(len(self._cache)-1, -1, -1):
179                     if self._cache[i].ready.is_set():
180                         del self._cache[i]
181                         break
182                 sm = sum([slot.size() for slot in self._cache])
183
184     def _get(self, locator):
185         # Test if the locator is already in the cache
186         for i in xrange(0, len(self._cache)):
187             if self._cache[i].locator == locator:
188                 n = self._cache[i]
189                 if i != 0:
190                     # move it to the front
191                     del self._cache[i]
192                     self._cache.insert(0, n)
193                 return n
194         return None
195
196     def get(self, locator):
197         with self._cache_lock:
198             return self._get(locator)
199
200     def reserve_cache(self, locator):
201         '''Reserve a cache slot for the specified locator,
202         or return the existing slot.'''
203         with self._cache_lock:
204             n = self._get(locator)
205             if n:
206                 return n, False
207             else:
208                 # Add a new cache slot for the locator
209                 n = KeepBlockCache.CacheSlot(locator)
210                 self._cache.insert(0, n)
211                 return n, True
212
213 class KeepClient(object):
214
215     # Default Keep server connection timeout:  2 seconds
216     # Default Keep server read timeout:      300 seconds
217     # Default Keep proxy connection timeout:  20 seconds
218     # Default Keep proxy read timeout:       300 seconds
219     DEFAULT_TIMEOUT = (2, 300)
220     DEFAULT_PROXY_TIMEOUT = (20, 300)
221
222     class ThreadLimiter(object):
223         """Limit the number of threads writing to Keep at once.
224
225         This ensures that only a number of writer threads that could
226         potentially achieve the desired replication level run at once.
227         Once the desired replication level is achieved, queued threads
228         are instructed not to run.
229
230         Should be used in a "with" block.
231         """
232         def __init__(self, want_copies, max_service_replicas):
233             self._started = 0
234             self._want_copies = want_copies
235             self._done = 0
236             self._response = None
237             self._start_lock = threading.Condition()
238             if (not max_service_replicas) or (max_service_replicas >= want_copies):
239                 max_threads = 1
240             else:
241                 max_threads = math.ceil(float(want_copies) / max_service_replicas)
242             _logger.debug("Limiter max threads is %d", max_threads)
243             self._todo_lock = threading.Semaphore(max_threads)
244             self._done_lock = threading.Lock()
245             self._local = threading.local()
246
247         def __enter__(self):
248             self._start_lock.acquire()
249             if getattr(self._local, 'sequence', None) is not None:
250                 # If the calling thread has used set_sequence(N), then
251                 # we wait here until N other threads have started.
252                 while self._started < self._local.sequence:
253                     self._start_lock.wait()
254             self._todo_lock.acquire()
255             self._started += 1
256             self._start_lock.notifyAll()
257             self._start_lock.release()
258             return self
259
260         def __exit__(self, type, value, traceback):
261             self._todo_lock.release()
262
263         def set_sequence(self, sequence):
264             self._local.sequence = sequence
265
266         def shall_i_proceed(self):
267             """
268             Return true if the current thread should write to Keep.
269             Return false otherwise.
270             """
271             with self._done_lock:
272                 return (self._done < self._want_copies)
273
274         def save_response(self, response_body, replicas_stored):
275             """
276             Records a response body (a locator, possibly signed) returned by
277             the Keep server, and the number of replicas it stored.
278             """
279             with self._done_lock:
280                 self._done += replicas_stored
281                 self._response = response_body
282
283         def response(self):
284             """Return the body from the response to a PUT request."""
285             with self._done_lock:
286                 return self._response
287
288         def done(self):
289             """Return the total number of replicas successfully stored."""
290             with self._done_lock:
291                 return self._done
292
293
294     class KeepService(object):
295         """Make requests to a single Keep service, and track results.
296
297         A KeepService is intended to last long enough to perform one
298         transaction (GET or PUT) against one Keep service. This can
299         involve calling either get() or put() multiple times in order
300         to retry after transient failures. However, calling both get()
301         and put() on a single instance -- or using the same instance
302         to access two different Keep services -- will not produce
303         sensible behavior.
304         """
305
306         HTTP_ERRORS = (
307             socket.error,
308             ssl.SSLError,
309             arvados.errors.HttpError,
310         )
311
312         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
313             self.root = root
314             self._user_agent_pool = user_agent_pool
315             self._result = {'error': None}
316             self._usable = True
317             self._session = None
318             self.get_headers = {'Accept': 'application/octet-stream'}
319             self.get_headers.update(headers)
320             self.put_headers = headers
321
322         def usable(self):
323             """Is it worth attempting a request?"""
324             return self._usable
325
326         def finished(self):
327             """Did the request succeed or encounter permanent failure?"""
328             return self._result['error'] == False or not self._usable
329
330         def last_result(self):
331             return self._result
332
333         def _get_user_agent(self):
334             try:
335                 return self._user_agent_pool.get(False)
336             except Queue.Empty:
337                 return pycurl.Curl()
338
339         def _put_user_agent(self, ua):
340             try:
341                 ua.reset()
342                 self._user_agent_pool.put(ua, False)
343             except:
344                 ua.close()
345
346         @staticmethod
347         def _socket_open(family, socktype, protocol, address=None):
348             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
349             s = socket.socket(family, socktype, protocol)
350             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
351             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
352             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
353             return s
354
355         def get(self, locator, timeout=None):
356             # locator is a KeepLocator object.
357             url = self.root + str(locator)
358             _logger.debug("Request: GET %s", url)
359             curl = self._get_user_agent()
360             try:
361                 with timer.Timer() as t:
362                     self._headers = {}
363                     response_body = cStringIO.StringIO()
364                     curl.setopt(pycurl.NOSIGNAL, 1)
365                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
366                     curl.setopt(pycurl.URL, url.encode('utf-8'))
367                     curl.setopt(pycurl.HTTPHEADER, [
368                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
369                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
370                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
371                     self._setcurltimeouts(curl, timeout)
372                     try:
373                         curl.perform()
374                     except Exception as e:
375                         raise arvados.errors.HttpError(0, str(e))
376                     self._result = {
377                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
378                         'body': response_body.getvalue(),
379                         'headers': self._headers,
380                         'error': False,
381                     }
382                 ok = retry.check_http_response_success(self._result['status_code'])
383                 if not ok:
384                     self._result['error'] = arvados.errors.HttpError(
385                         self._result['status_code'],
386                         self._headers.get('x-status-line', 'Error'))
387             except self.HTTP_ERRORS as e:
388                 self._result = {
389                     'error': e,
390                 }
391                 ok = False
392             self._usable = ok != False
393             if self._result.get('status_code', None):
394                 # The client worked well enough to get an HTTP status
395                 # code, so presumably any problems are just on the
396                 # server side and it's OK to reuse the client.
397                 self._put_user_agent(curl)
398             else:
399                 # Don't return this client to the pool, in case it's
400                 # broken.
401                 curl.close()
402             if not ok:
403                 _logger.debug("Request fail: GET %s => %s: %s",
404                               url, type(self._result['error']), str(self._result['error']))
405                 return None
406             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
407                          self._result['status_code'],
408                          len(self._result['body']),
409                          t.msecs,
410                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
411             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
412             if resp_md5 != locator.md5sum:
413                 _logger.warning("Checksum fail: md5(%s) = %s",
414                                 url, resp_md5)
415                 self._result['error'] = arvados.errors.HttpError(
416                     0, 'Checksum fail')
417                 return None
418             return self._result['body']
419
420         def put(self, hash_s, body, timeout=None):
421             url = self.root + hash_s
422             _logger.debug("Request: PUT %s", url)
423             curl = self._get_user_agent()
424             try:
425                 self._headers = {}
426                 body_reader = cStringIO.StringIO(body)
427                 response_body = cStringIO.StringIO()
428                 curl.setopt(pycurl.NOSIGNAL, 1)
429                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
430                 curl.setopt(pycurl.URL, url.encode('utf-8'))
431                 # Using UPLOAD tells cURL to wait for a "go ahead" from the
432                 # Keep server (in the form of a HTTP/1.1 "100 Continue"
433                 # response) instead of sending the request body immediately.
434                 # This allows the server to reject the request if the request
435                 # is invalid or the server is read-only, without waiting for
436                 # the client to send the entire block.
437                 curl.setopt(pycurl.UPLOAD, True)
438                 curl.setopt(pycurl.INFILESIZE, len(body))
439                 curl.setopt(pycurl.READFUNCTION, body_reader.read)
440                 curl.setopt(pycurl.HTTPHEADER, [
441                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
442                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
443                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
444                 self._setcurltimeouts(curl, timeout)
445                 try:
446                     curl.perform()
447                 except Exception as e:
448                     raise arvados.errors.HttpError(0, str(e))
449                 self._result = {
450                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
451                     'body': response_body.getvalue(),
452                     'headers': self._headers,
453                     'error': False,
454                 }
455                 ok = retry.check_http_response_success(self._result['status_code'])
456                 if not ok:
457                     self._result['error'] = arvados.errors.HttpError(
458                         self._result['status_code'],
459                         self._headers.get('x-status-line', 'Error'))
460             except self.HTTP_ERRORS as e:
461                 self._result = {
462                     'error': e,
463                 }
464                 ok = False
465             self._usable = ok != False # still usable if ok is True or None
466             if self._result.get('status_code', None):
467                 # Client is functional. See comment in get().
468                 self._put_user_agent(curl)
469             else:
470                 curl.close()
471             if not ok:
472                 _logger.debug("Request fail: PUT %s => %s: %s",
473                               url, type(self._result['error']), str(self._result['error']))
474                 return False
475             return True
476
477         def _setcurltimeouts(self, curl, timeouts):
478             if not timeouts:
479                 return
480             elif isinstance(timeouts, tuple):
481                 conn_t, xfer_t = timeouts
482             else:
483                 conn_t, xfer_t = (timeouts, timeouts)
484             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
485             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
486
487         def _headerfunction(self, header_line):
488             header_line = header_line.decode('iso-8859-1')
489             if ':' in header_line:
490                 name, value = header_line.split(':', 1)
491                 name = name.strip().lower()
492                 value = value.strip()
493             elif self._headers:
494                 name = self._lastheadername
495                 value = self._headers[name] + ' ' + header_line.strip()
496             elif header_line.startswith('HTTP/'):
497                 name = 'x-status-line'
498                 value = header_line
499             else:
500                 _logger.error("Unexpected header line: %s", header_line)
501                 return
502             self._lastheadername = name
503             self._headers[name] = value
504             # Returning None implies all bytes were written
505
506
507     class KeepWriterThread(threading.Thread):
508         """
509         Write a blob of data to the given Keep server. On success, call
510         save_response() of the given ThreadLimiter to save the returned
511         locator.
512         """
513         def __init__(self, keep_service, **kwargs):
514             super(KeepClient.KeepWriterThread, self).__init__()
515             self.service = keep_service
516             self.args = kwargs
517             self._success = False
518
519         def success(self):
520             return self._success
521
522         def run(self):
523             limiter = self.args['thread_limiter']
524             sequence = self.args['thread_sequence']
525             if sequence is not None:
526                 limiter.set_sequence(sequence)
527             with limiter:
528                 if not limiter.shall_i_proceed():
529                     # My turn arrived, but the job has been done without
530                     # me.
531                     return
532                 self.run_with_limiter(limiter)
533
534         def run_with_limiter(self, limiter):
535             if self.service.finished():
536                 return
537             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
538                           str(threading.current_thread()),
539                           self.args['data_hash'],
540                           len(self.args['data']),
541                           self.args['service_root'])
542             self._success = bool(self.service.put(
543                 self.args['data_hash'],
544                 self.args['data'],
545                 timeout=self.args.get('timeout', None)))
546             result = self.service.last_result()
547             if self._success:
548                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
549                               str(threading.current_thread()),
550                               self.args['data_hash'],
551                               len(self.args['data']),
552                               self.args['service_root'])
553                 # Tick the 'done' counter for the number of replica
554                 # reported stored by the server, for the case that
555                 # we're talking to a proxy or other backend that
556                 # stores to multiple copies for us.
557                 try:
558                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
559                 except (KeyError, ValueError):
560                     replicas_stored = 1
561                 limiter.save_response(result['body'].strip(), replicas_stored)
562             elif result.get('status_code', None):
563                 _logger.debug("Request fail: PUT %s => %s %s",
564                               self.args['data_hash'],
565                               result['status_code'],
566                               result['body'])
567
568
569     def __init__(self, api_client=None, proxy=None,
570                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
571                  api_token=None, local_store=None, block_cache=None,
572                  num_retries=0, session=None):
573         """Initialize a new KeepClient.
574
575         Arguments:
576         :api_client:
577           The API client to use to find Keep services.  If not
578           provided, KeepClient will build one from available Arvados
579           configuration.
580
581         :proxy:
582           If specified, this KeepClient will send requests to this Keep
583           proxy.  Otherwise, KeepClient will fall back to the setting of the
584           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
585           KeepClient does not use a proxy, pass in an empty string.
586
587         :timeout:
588           The initial timeout (in seconds) for HTTP requests to Keep
589           non-proxy servers.  A tuple of two floats is interpreted as
590           (connection_timeout, read_timeout): see
591           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
592           Because timeouts are often a result of transient server load, the
593           actual connection timeout will be increased by a factor of two on
594           each retry.
595           Default: (2, 300).
596
597         :proxy_timeout:
598           The initial timeout (in seconds) for HTTP requests to
599           Keep proxies. A tuple of two floats is interpreted as
600           (connection_timeout, read_timeout). The behavior described
601           above for adjusting connection timeouts on retry also applies.
602           Default: (20, 300).
603
604         :api_token:
605           If you're not using an API client, but only talking
606           directly to a Keep proxy, this parameter specifies an API token
607           to authenticate Keep requests.  It is an error to specify both
608           api_client and api_token.  If you specify neither, KeepClient
609           will use one available from the Arvados configuration.
610
611         :local_store:
612           If specified, this KeepClient will bypass Keep
613           services, and save data to the named directory.  If unspecified,
614           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
615           environment variable.  If you want to ensure KeepClient does not
616           use local storage, pass in an empty string.  This is primarily
617           intended to mock a server for testing.
618
619         :num_retries:
620           The default number of times to retry failed requests.
621           This will be used as the default num_retries value when get() and
622           put() are called.  Default 0.
623         """
624         self.lock = threading.Lock()
625         if proxy is None:
626             proxy = config.get('ARVADOS_KEEP_PROXY')
627         if api_token is None:
628             if api_client is None:
629                 api_token = config.get('ARVADOS_API_TOKEN')
630             else:
631                 api_token = api_client.api_token
632         elif api_client is not None:
633             raise ValueError(
634                 "can't build KeepClient with both API client and token")
635         if local_store is None:
636             local_store = os.environ.get('KEEP_LOCAL_STORE')
637
638         self.block_cache = block_cache if block_cache else KeepBlockCache()
639         self.timeout = timeout
640         self.proxy_timeout = proxy_timeout
641         self._user_agent_pool = Queue.LifoQueue()
642
643         if local_store:
644             self.local_store = local_store
645             self.get = self.local_store_get
646             self.put = self.local_store_put
647         else:
648             self.num_retries = num_retries
649             self.max_replicas_per_service = None
650             if proxy:
651                 if not proxy.endswith('/'):
652                     proxy += '/'
653                 self.api_token = api_token
654                 self._gateway_services = {}
655                 self._keep_services = [{
656                     'uuid': 'proxy',
657                     'service_type': 'proxy',
658                     '_service_root': proxy,
659                     }]
660                 self._writable_services = self._keep_services
661                 self.using_proxy = True
662                 self._static_services_list = True
663             else:
664                 # It's important to avoid instantiating an API client
665                 # unless we actually need one, for testing's sake.
666                 if api_client is None:
667                     api_client = arvados.api('v1')
668                 self.api_client = api_client
669                 self.api_token = api_client.api_token
670                 self._gateway_services = {}
671                 self._keep_services = None
672                 self._writable_services = None
673                 self.using_proxy = None
674                 self._static_services_list = False
675
676     def current_timeout(self, attempt_number):
677         """Return the appropriate timeout to use for this client.
678
679         The proxy timeout setting if the backend service is currently a proxy,
680         the regular timeout setting otherwise.  The `attempt_number` indicates
681         how many times the operation has been tried already (starting from 0
682         for the first try), and scales the connection timeout portion of the
683         return value accordingly.
684
685         """
686         # TODO(twp): the timeout should be a property of a
687         # KeepService, not a KeepClient. See #4488.
688         t = self.proxy_timeout if self.using_proxy else self.timeout
689         return (t[0] * (1 << attempt_number), t[1])
690
691     def _any_nondisk_services(self, service_list):
692         return any(ks.get('service_type', 'disk') != 'disk'
693                    for ks in service_list)
694
695     def build_services_list(self, force_rebuild=False):
696         if (self._static_services_list or
697               (self._keep_services and not force_rebuild)):
698             return
699         with self.lock:
700             try:
701                 keep_services = self.api_client.keep_services().accessible()
702             except Exception:  # API server predates Keep services.
703                 keep_services = self.api_client.keep_disks().list()
704
705             # Gateway services are only used when specified by UUID,
706             # so there's nothing to gain by filtering them by
707             # service_type.
708             self._gateway_services = {ks['uuid']: ks for ks in
709                                       keep_services.execute()['items']}
710             if not self._gateway_services:
711                 raise arvados.errors.NoKeepServersError()
712
713             # Precompute the base URI for each service.
714             for r in self._gateway_services.itervalues():
715                 host = r['service_host']
716                 if not host.startswith('[') and host.find(':') >= 0:
717                     # IPv6 URIs must be formatted like http://[::1]:80/...
718                     host = '[' + host + ']'
719                 r['_service_root'] = "{}://{}:{:d}/".format(
720                     'https' if r['service_ssl_flag'] else 'http',
721                     host,
722                     r['service_port'])
723
724             _logger.debug(str(self._gateway_services))
725             self._keep_services = [
726                 ks for ks in self._gateway_services.itervalues()
727                 if not ks.get('service_type', '').startswith('gateway:')]
728             self._writable_services = [ks for ks in self._keep_services
729                                        if not ks.get('read_only')]
730
731             # For disk type services, max_replicas_per_service is 1
732             # It is unknown (unlimited) for other service types.
733             if self._any_nondisk_services(self._writable_services):
734                 self.max_replicas_per_service = None
735             else:
736                 self.max_replicas_per_service = 1
737
738     def _service_weight(self, data_hash, service_uuid):
739         """Compute the weight of a Keep service endpoint for a data
740         block with a known hash.
741
742         The weight is md5(h + u) where u is the last 15 characters of
743         the service endpoint's UUID.
744         """
745         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
746
747     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
748         """Return an array of Keep service endpoints, in the order in
749         which they should be probed when reading or writing data with
750         the given hash+hints.
751         """
752         self.build_services_list(force_rebuild)
753
754         sorted_roots = []
755         # Use the services indicated by the given +K@... remote
756         # service hints, if any are present and can be resolved to a
757         # URI.
758         for hint in locator.hints:
759             if hint.startswith('K@'):
760                 if len(hint) == 7:
761                     sorted_roots.append(
762                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
763                 elif len(hint) == 29:
764                     svc = self._gateway_services.get(hint[2:])
765                     if svc:
766                         sorted_roots.append(svc['_service_root'])
767
768         # Sort the available local services by weight (heaviest first)
769         # for this locator, and return their service_roots (base URIs)
770         # in that order.
771         use_services = self._keep_services
772         if need_writable:
773             use_services = self._writable_services
774         self.using_proxy = self._any_nondisk_services(use_services)
775         sorted_roots.extend([
776             svc['_service_root'] for svc in sorted(
777                 use_services,
778                 reverse=True,
779                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
780         _logger.debug("{}: {}".format(locator, sorted_roots))
781         return sorted_roots
782
783     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
784         # roots_map is a dictionary, mapping Keep service root strings
785         # to KeepService objects.  Poll for Keep services, and add any
786         # new ones to roots_map.  Return the current list of local
787         # root strings.
788         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
789         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
790         for root in local_roots:
791             if root not in roots_map:
792                 roots_map[root] = self.KeepService(
793                     root, self._user_agent_pool, **headers)
794         return local_roots
795
796     @staticmethod
797     def _check_loop_result(result):
798         # KeepClient RetryLoops should save results as a 2-tuple: the
799         # actual result of the request, and the number of servers available
800         # to receive the request this round.
801         # This method returns True if there's a real result, False if
802         # there are no more servers available, otherwise None.
803         if isinstance(result, Exception):
804             return None
805         result, tried_server_count = result
806         if (result is not None) and (result is not False):
807             return True
808         elif tried_server_count < 1:
809             _logger.info("No more Keep services to try; giving up")
810             return False
811         else:
812             return None
813
814     def get_from_cache(self, loc):
815         """Fetch a block only if is in the cache, otherwise return None."""
816         slot = self.block_cache.get(loc)
817         if slot is not None and slot.ready.is_set():
818             return slot.get()
819         else:
820             return None
821
822     @retry.retry_method
823     def get(self, loc_s, num_retries=None):
824         """Get data from Keep.
825
826         This method fetches one or more blocks of data from Keep.  It
827         sends a request each Keep service registered with the API
828         server (or the proxy provided when this client was
829         instantiated), then each service named in location hints, in
830         sequence.  As soon as one service provides the data, it's
831         returned.
832
833         Arguments:
834         * loc_s: A string of one or more comma-separated locators to fetch.
835           This method returns the concatenation of these blocks.
836         * num_retries: The number of times to retry GET requests to
837           *each* Keep server if it returns temporary failures, with
838           exponential backoff.  Note that, in each loop, the method may try
839           to fetch data from every available Keep service, along with any
840           that are named in location hints in the locator.  The default value
841           is set when the KeepClient is initialized.
842         """
843         if ',' in loc_s:
844             return ''.join(self.get(x) for x in loc_s.split(','))
845         locator = KeepLocator(loc_s)
846         slot, first = self.block_cache.reserve_cache(locator.md5sum)
847         if not first:
848             v = slot.get()
849             return v
850
851         # If the locator has hints specifying a prefix (indicating a
852         # remote keepproxy) or the UUID of a local gateway service,
853         # read data from the indicated service(s) instead of the usual
854         # list of local disk services.
855         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
856                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
857         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
858                            for hint in locator.hints if (
859                                    hint.startswith('K@') and
860                                    len(hint) == 29 and
861                                    self._gateway_services.get(hint[2:])
862                                    )])
863         # Map root URLs to their KeepService objects.
864         roots_map = {
865             root: self.KeepService(root, self._user_agent_pool)
866             for root in hint_roots
867         }
868
869         # See #3147 for a discussion of the loop implementation.  Highlights:
870         # * Refresh the list of Keep services after each failure, in case
871         #   it's being updated.
872         # * Retry until we succeed, we're out of retries, or every available
873         #   service has returned permanent failure.
874         sorted_roots = []
875         roots_map = {}
876         blob = None
877         loop = retry.RetryLoop(num_retries, self._check_loop_result,
878                                backoff_start=2)
879         for tries_left in loop:
880             try:
881                 sorted_roots = self.map_new_services(
882                     roots_map, locator,
883                     force_rebuild=(tries_left < num_retries),
884                     need_writable=False)
885             except Exception as error:
886                 loop.save_result(error)
887                 continue
888
889             # Query KeepService objects that haven't returned
890             # permanent failure, in our specified shuffle order.
891             services_to_try = [roots_map[root]
892                                for root in sorted_roots
893                                if roots_map[root].usable()]
894             for keep_service in services_to_try:
895                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
896                 if blob is not None:
897                     break
898             loop.save_result((blob, len(services_to_try)))
899
900         # Always cache the result, then return it if we succeeded.
901         slot.set(blob)
902         self.block_cache.cap_cache()
903         if loop.success():
904             return blob
905
906         # Q: Including 403 is necessary for the Keep tests to continue
907         # passing, but maybe they should expect KeepReadError instead?
908         not_founds = sum(1 for key in sorted_roots
909                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
910         service_errors = ((key, roots_map[key].last_result()['error'])
911                           for key in sorted_roots)
912         if not roots_map:
913             raise arvados.errors.KeepReadError(
914                 "failed to read {}: no Keep services available ({})".format(
915                     loc_s, loop.last_result()))
916         elif not_founds == len(sorted_roots):
917             raise arvados.errors.NotFoundError(
918                 "{} not found".format(loc_s), service_errors)
919         else:
920             raise arvados.errors.KeepReadError(
921                 "failed to read {}".format(loc_s), service_errors, label="service")
922
923     @retry.retry_method
924     def put(self, data, copies=2, num_retries=None):
925         """Save data in Keep.
926
927         This method will get a list of Keep services from the API server, and
928         send the data to each one simultaneously in a new thread.  Once the
929         uploads are finished, if enough copies are saved, this method returns
930         the most recent HTTP response body.  If requests fail to upload
931         enough copies, this method raises KeepWriteError.
932
933         Arguments:
934         * data: The string of data to upload.
935         * copies: The number of copies that the user requires be saved.
936           Default 2.
937         * num_retries: The number of times to retry PUT requests to
938           *each* Keep server if it returns temporary failures, with
939           exponential backoff.  The default value is set when the
940           KeepClient is initialized.
941         """
942
943         if isinstance(data, unicode):
944             data = data.encode("ascii")
945         elif not isinstance(data, str):
946             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
947
948         data_hash = hashlib.md5(data).hexdigest()
949         loc_s = data_hash + '+' + str(len(data))
950         if copies < 1:
951             return loc_s
952         locator = KeepLocator(loc_s)
953
954         headers = {}
955         # Tell the proxy how many copies we want it to store
956         headers['X-Keep-Desired-Replication'] = str(copies)
957         roots_map = {}
958         loop = retry.RetryLoop(num_retries, self._check_loop_result,
959                                backoff_start=2)
960         for tries_left in loop:
961             try:
962                 sorted_roots = self.map_new_services(
963                     roots_map, locator,
964                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
965             except Exception as error:
966                 loop.save_result(error)
967                 continue
968
969             thread_limiter = KeepClient.ThreadLimiter(
970                 copies, self.max_replicas_per_service)
971             threads = []
972             for service_root, ks in [(root, roots_map[root])
973                                      for root in sorted_roots]:
974                 if ks.finished():
975                     continue
976                 t = KeepClient.KeepWriterThread(
977                     ks,
978                     data=data,
979                     data_hash=data_hash,
980                     service_root=service_root,
981                     thread_limiter=thread_limiter,
982                     timeout=self.current_timeout(num_retries-tries_left),
983                     thread_sequence=len(threads))
984                 t.start()
985                 threads.append(t)
986             for t in threads:
987                 t.join()
988             loop.save_result((thread_limiter.done() >= copies, len(threads)))
989
990         if loop.success():
991             return thread_limiter.response()
992         if not roots_map:
993             raise arvados.errors.KeepWriteError(
994                 "failed to write {}: no Keep services available ({})".format(
995                     data_hash, loop.last_result()))
996         else:
997             service_errors = ((key, roots_map[key].last_result()['error'])
998                               for key in sorted_roots
999                               if roots_map[key].last_result()['error'])
1000             raise arvados.errors.KeepWriteError(
1001                 "failed to write {} (wanted {} copies but wrote {})".format(
1002                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1003
1004     def local_store_put(self, data, copies=1, num_retries=None):
1005         """A stub for put().
1006
1007         This method is used in place of the real put() method when
1008         using local storage (see constructor's local_store argument).
1009
1010         copies and num_retries arguments are ignored: they are here
1011         only for the sake of offering the same call signature as
1012         put().
1013
1014         Data stored this way can be retrieved via local_store_get().
1015         """
1016         md5 = hashlib.md5(data).hexdigest()
1017         locator = '%s+%d' % (md5, len(data))
1018         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1019             f.write(data)
1020         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1021                   os.path.join(self.local_store, md5))
1022         return locator
1023
1024     def local_store_get(self, loc_s, num_retries=None):
1025         """Companion to local_store_put()."""
1026         try:
1027             locator = KeepLocator(loc_s)
1028         except ValueError:
1029             raise arvados.errors.NotFoundError(
1030                 "Invalid data locator: '%s'" % loc_s)
1031         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1032             return ''
1033         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1034             return f.read()
1035
1036     def is_cached(self, locator):
1037         return self.block_cache.reserve_cache(expect_hash)