ee3419c88b5da59839897b4357e73352484ff9af
[arvados.git] / sdk / python / arvados / keep.py
1 import bz2
2 import datetime
3 import fcntl
4 import functools
5 import gflags
6 import hashlib
7 import json
8 import logging
9 import os
10 import pprint
11 import pycurl
12 import Queue
13 import re
14 import socket
15 import ssl
16 import string
17 import cStringIO
18 import subprocess
19 import sys
20 import threading
21 import time
22 import timer
23 import types
24 import UserDict
25 import util
26 import zlib
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 _logger = logging.getLogger('arvados.keep')
35 global_client_object = None
36
37
38 class KeepLocator(object):
39     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
40     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
41
42     def __init__(self, locator_str):
43         self.hints = []
44         self._perm_sig = None
45         self._perm_expiry = None
46         pieces = iter(locator_str.split('+'))
47         self.md5sum = next(pieces)
48         try:
49             self.size = int(next(pieces))
50         except StopIteration:
51             self.size = None
52         for hint in pieces:
53             if self.HINT_RE.match(hint) is None:
54                 raise ValueError("invalid hint format: {}".format(hint))
55             elif hint.startswith('A'):
56                 self.parse_permission_hint(hint)
57             else:
58                 self.hints.append(hint)
59
60     def __str__(self):
61         return '+'.join(
62             str(s) for s in [self.md5sum, self.size,
63                              self.permission_hint()] + self.hints
64             if s is not None)
65
66     def stripped(self):
67         if self.size is not None:
68             return "%s+%i" % (self.md5sum, self.size)
69         else:
70             return self.md5sum
71
72     def _make_hex_prop(name, length):
73         # Build and return a new property with the given name that
74         # must be a hex string of the given length.
75         data_name = '_{}'.format(name)
76         def getter(self):
77             return getattr(self, data_name)
78         def setter(self, hex_str):
79             if not arvados.util.is_hex(hex_str, length):
80                 raise ValueError("{} is not a {}-digit hex string: {}".
81                                  format(name, length, hex_str))
82             setattr(self, data_name, hex_str)
83         return property(getter, setter)
84
85     md5sum = _make_hex_prop('md5sum', 32)
86     perm_sig = _make_hex_prop('perm_sig', 40)
87
88     @property
89     def perm_expiry(self):
90         return self._perm_expiry
91
92     @perm_expiry.setter
93     def perm_expiry(self, value):
94         if not arvados.util.is_hex(value, 1, 8):
95             raise ValueError(
96                 "permission timestamp must be a hex Unix timestamp: {}".
97                 format(value))
98         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
99
100     def permission_hint(self):
101         data = [self.perm_sig, self.perm_expiry]
102         if None in data:
103             return None
104         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
105         return "A{}@{:08x}".format(*data)
106
107     def parse_permission_hint(self, s):
108         try:
109             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
110         except IndexError:
111             raise ValueError("bad permission hint {}".format(s))
112
113     def permission_expired(self, as_of_dt=None):
114         if self.perm_expiry is None:
115             return False
116         elif as_of_dt is None:
117             as_of_dt = datetime.datetime.now()
118         return self.perm_expiry <= as_of_dt
119
120
121 class Keep(object):
122     """Simple interface to a global KeepClient object.
123
124     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
125     own API client.  The global KeepClient will build an API client from the
126     current Arvados configuration, which may not match the one you built.
127     """
128     _last_key = None
129
130     @classmethod
131     def global_client_object(cls):
132         global global_client_object
133         # Previously, KeepClient would change its behavior at runtime based
134         # on these configuration settings.  We simulate that behavior here
135         # by checking the values and returning a new KeepClient if any of
136         # them have changed.
137         key = (config.get('ARVADOS_API_HOST'),
138                config.get('ARVADOS_API_TOKEN'),
139                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
140                config.get('ARVADOS_KEEP_PROXY'),
141                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
142                os.environ.get('KEEP_LOCAL_STORE'))
143         if (global_client_object is None) or (cls._last_key != key):
144             global_client_object = KeepClient()
145             cls._last_key = key
146         return global_client_object
147
148     @staticmethod
149     def get(locator, **kwargs):
150         return Keep.global_client_object().get(locator, **kwargs)
151
152     @staticmethod
153     def put(data, **kwargs):
154         return Keep.global_client_object().put(data, **kwargs)
155
156 class KeepBlockCache(object):
157     # Default RAM cache is 256MiB
158     def __init__(self, cache_max=(256 * 1024 * 1024)):
159         self.cache_max = cache_max
160         self._cache = []
161         self._cache_lock = threading.Lock()
162
163     class CacheSlot(object):
164         def __init__(self, locator):
165             self.locator = locator
166             self.ready = threading.Event()
167             self.content = None
168
169         def get(self):
170             self.ready.wait()
171             return self.content
172
173         def set(self, value):
174             self.content = value
175             self.ready.set()
176
177         def size(self):
178             if self.content is None:
179                 return 0
180             else:
181                 return len(self.content)
182
183     def cap_cache(self):
184         '''Cap the cache size to self.cache_max'''
185         with self._cache_lock:
186             # Select all slots except those where ready.is_set() and content is
187             # None (that means there was an error reading the block).
188             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
189             sm = sum([slot.size() for slot in self._cache])
190             while len(self._cache) > 0 and sm > self.cache_max:
191                 for i in xrange(len(self._cache)-1, -1, -1):
192                     if self._cache[i].ready.is_set():
193                         del self._cache[i]
194                         break
195                 sm = sum([slot.size() for slot in self._cache])
196
197     def _get(self, locator):
198         # Test if the locator is already in the cache
199         for i in xrange(0, len(self._cache)):
200             if self._cache[i].locator == locator:
201                 n = self._cache[i]
202                 if i != 0:
203                     # move it to the front
204                     del self._cache[i]
205                     self._cache.insert(0, n)
206                 return n
207         return None
208
209     def get(self, locator):
210         with self._cache_lock:
211             return self._get(locator)
212
213     def reserve_cache(self, locator):
214         '''Reserve a cache slot for the specified locator,
215         or return the existing slot.'''
216         with self._cache_lock:
217             n = self._get(locator)
218             if n:
219                 return n, False
220             else:
221                 # Add a new cache slot for the locator
222                 n = KeepBlockCache.CacheSlot(locator)
223                 self._cache.insert(0, n)
224                 return n, True
225
226 class KeepClient(object):
227
228     # Default Keep server connection timeout:  2 seconds
229     # Default Keep server read timeout:      300 seconds
230     # Default Keep proxy connection timeout:  20 seconds
231     # Default Keep proxy read timeout:       300 seconds
232     DEFAULT_TIMEOUT = (2, 300)
233     DEFAULT_PROXY_TIMEOUT = (20, 300)
234
235     class ThreadLimiter(object):
236         """
237         Limit the number of threads running at a given time to
238         {desired successes} minus {successes reported}. When successes
239         reported == desired, wake up the remaining threads and tell
240         them to quit.
241
242         Should be used in a "with" block.
243         """
244         def __init__(self, todo):
245             self._todo = todo
246             self._done = 0
247             self._response = None
248             self._todo_lock = threading.Semaphore(todo)
249             self._done_lock = threading.Lock()
250
251         def __enter__(self):
252             self._todo_lock.acquire()
253             return self
254
255         def __exit__(self, type, value, traceback):
256             self._todo_lock.release()
257
258         def shall_i_proceed(self):
259             """
260             Return true if the current thread should do stuff. Return
261             false if the current thread should just stop.
262             """
263             with self._done_lock:
264                 return (self._done < self._todo)
265
266         def save_response(self, response_body, replicas_stored):
267             """
268             Records a response body (a locator, possibly signed) returned by
269             the Keep server.  It is not necessary to save more than
270             one response, since we presume that any locator returned
271             in response to a successful request is valid.
272             """
273             with self._done_lock:
274                 self._done += replicas_stored
275                 self._response = response_body
276
277         def response(self):
278             """
279             Returns the body from the response to a PUT request.
280             """
281             with self._done_lock:
282                 return self._response
283
284         def done(self):
285             """
286             Return how many successes were reported.
287             """
288             with self._done_lock:
289                 return self._done
290
291
292     class KeepService(object):
293         """Make requests to a single Keep service, and track results.
294
295         A KeepService is intended to last long enough to perform one
296         transaction (GET or PUT) against one Keep service. This can
297         involve calling either get() or put() multiple times in order
298         to retry after transient failures. However, calling both get()
299         and put() on a single instance -- or using the same instance
300         to access two different Keep services -- will not produce
301         sensible behavior.
302         """
303
304         HTTP_ERRORS = (
305             socket.error,
306             ssl.SSLError,
307             arvados.errors.HttpError,
308         )
309
310         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
311             self.root = root
312             self._user_agent_pool = user_agent_pool
313             self._result = {'error': None}
314             self._usable = True
315             self._session = None
316             self.get_headers = {'Accept': 'application/octet-stream'}
317             self.get_headers.update(headers)
318             self.put_headers = headers
319
320         def usable(self):
321             """Is it worth attempting a request?"""
322             return self._usable
323
324         def finished(self):
325             """Did the request succeed or encounter permanent failure?"""
326             return self._result['error'] == False or not self._usable
327
328         def last_result(self):
329             return self._result
330
331         def _get_user_agent(self):
332             try:
333                 return self._user_agent_pool.get(False)
334             except Queue.Empty:
335                 return pycurl.Curl()
336
337         def _put_user_agent(self, ua):
338             try:
339                 ua.reset()
340                 self._user_agent_pool.put(ua, False)
341             except:
342                 ua.close()
343
344         @staticmethod
345         def _socket_open(family, socktype, protocol, address=None):
346             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
347             s = socket.socket(family, socktype, protocol)
348             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
349             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
350             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
351             return s
352
353         def get(self, locator, timeout=None):
354             # locator is a KeepLocator object.
355             url = self.root + str(locator)
356             _logger.debug("Request: GET %s", url)
357             curl = self._get_user_agent()
358             try:
359                 with timer.Timer() as t:
360                     self._headers = {}
361                     response_body = cStringIO.StringIO()
362                     curl.setopt(pycurl.NOSIGNAL, 1)
363                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
364                     curl.setopt(pycurl.URL, url.encode('utf-8'))
365                     curl.setopt(pycurl.HTTPHEADER, [
366                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
367                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
368                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
369                     self._setcurltimeouts(curl, timeout)
370                     try:
371                         curl.perform()
372                     except Exception as e:
373                         raise arvados.errors.HttpError(0, str(e))
374                     self._result = {
375                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
376                         'body': response_body.getvalue(),
377                         'headers': self._headers,
378                         'error': False,
379                     }
380                 ok = retry.check_http_response_success(self._result['status_code'])
381                 if not ok:
382                     self._result['error'] = arvados.errors.HttpError(
383                         self._result['status_code'],
384                         self._headers.get('x-status-line', 'Error'))
385             except self.HTTP_ERRORS as e:
386                 self._result = {
387                     'error': e,
388                 }
389                 ok = False
390             self._usable = ok != False
391             if self._result.get('status_code', None):
392                 # The client worked well enough to get an HTTP status
393                 # code, so presumably any problems are just on the
394                 # server side and it's OK to reuse the client.
395                 self._put_user_agent(curl)
396             else:
397                 # Don't return this client to the pool, in case it's
398                 # broken.
399                 curl.close()
400             if not ok:
401                 _logger.debug("Request fail: GET %s => %s: %s",
402                               url, type(self._result['error']), str(self._result['error']))
403                 return None
404             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
405                          self._result['status_code'],
406                          len(self._result['body']),
407                          t.msecs,
408                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
409             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
410             if resp_md5 != locator.md5sum:
411                 _logger.warning("Checksum fail: md5(%s) = %s",
412                                 url, resp_md5)
413                 self._result['error'] = arvados.errors.HttpError(
414                     0, 'Checksum fail')
415                 return None
416             return self._result['body']
417
418         def put(self, hash_s, body, timeout=None):
419             url = self.root + hash_s
420             _logger.debug("Request: PUT %s", url)
421             curl = self._get_user_agent()
422             try:
423                 self._headers = {}
424                 body_reader = cStringIO.StringIO(body)
425                 response_body = cStringIO.StringIO()
426                 curl.setopt(pycurl.NOSIGNAL, 1)
427                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
428                 curl.setopt(pycurl.URL, url.encode('utf-8'))
429                 # Using UPLOAD tells cURL to wait for a "go ahead" from the
430                 # Keep server (in the form of a HTTP/1.1 "100 Continue"
431                 # response) instead of sending the request body immediately.
432                 # This allows the server to reject the request if the request
433                 # is invalid or the server is read-only, without waiting for
434                 # the client to send the entire block.
435                 curl.setopt(pycurl.UPLOAD, True)
436                 curl.setopt(pycurl.INFILESIZE, len(body))
437                 curl.setopt(pycurl.READFUNCTION, body_reader.read)
438                 curl.setopt(pycurl.HTTPHEADER, [
439                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
440                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
441                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
442                 self._setcurltimeouts(curl, timeout)
443                 try:
444                     curl.perform()
445                 except Exception as e:
446                     raise arvados.errors.HttpError(0, str(e))
447                 self._result = {
448                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
449                     'body': response_body.getvalue(),
450                     'headers': self._headers,
451                     'error': False,
452                 }
453                 ok = retry.check_http_response_success(self._result['status_code'])
454                 if not ok:
455                     self._result['error'] = arvados.errors.HttpError(
456                         self._result['status_code'],
457                         self._headers.get('x-status-line', 'Error'))
458             except self.HTTP_ERRORS as e:
459                 self._result = {
460                     'error': e,
461                 }
462                 ok = False
463             self._usable = ok != False # still usable if ok is True or None
464             if self._result.get('status_code', None):
465                 # Client is functional. See comment in get().
466                 self._put_user_agent(curl)
467             else:
468                 curl.close()
469             if not ok:
470                 _logger.debug("Request fail: PUT %s => %s: %s",
471                               url, type(self._result['error']), str(self._result['error']))
472                 return False
473             return True
474
475         def _setcurltimeouts(self, curl, timeouts):
476             if not timeouts:
477                 return
478             elif isinstance(timeouts, tuple):
479                 conn_t, xfer_t = timeouts
480             else:
481                 conn_t, xfer_t = (timeouts, timeouts)
482             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
483             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
484
485         def _headerfunction(self, header_line):
486             header_line = header_line.decode('iso-8859-1')
487             if ':' in header_line:
488                 name, value = header_line.split(':', 1)
489                 name = name.strip().lower()
490                 value = value.strip()
491             elif self._headers:
492                 name = self._lastheadername
493                 value = self._headers[name] + ' ' + header_line.strip()
494             elif header_line.startswith('HTTP/'):
495                 name = 'x-status-line'
496                 value = header_line
497             else:
498                 _logger.error("Unexpected header line: %s", header_line)
499                 return
500             self._lastheadername = name
501             self._headers[name] = value
502             # Returning None implies all bytes were written
503
504
505     class KeepWriterThread(threading.Thread):
506         """
507         Write a blob of data to the given Keep server. On success, call
508         save_response() of the given ThreadLimiter to save the returned
509         locator.
510         """
511         def __init__(self, keep_service, **kwargs):
512             super(KeepClient.KeepWriterThread, self).__init__()
513             self.service = keep_service
514             self.args = kwargs
515             self._success = False
516
517         def success(self):
518             return self._success
519
520         def run(self):
521             with self.args['thread_limiter'] as limiter:
522                 if not limiter.shall_i_proceed():
523                     # My turn arrived, but the job has been done without
524                     # me.
525                     return
526                 self.run_with_limiter(limiter)
527
528         def run_with_limiter(self, limiter):
529             if self.service.finished():
530                 return
531             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
532                           str(threading.current_thread()),
533                           self.args['data_hash'],
534                           len(self.args['data']),
535                           self.args['service_root'])
536             self._success = bool(self.service.put(
537                 self.args['data_hash'],
538                 self.args['data'],
539                 timeout=self.args.get('timeout', None)))
540             result = self.service.last_result()
541             if self._success:
542                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
543                               str(threading.current_thread()),
544                               self.args['data_hash'],
545                               len(self.args['data']),
546                               self.args['service_root'])
547                 # Tick the 'done' counter for the number of replica
548                 # reported stored by the server, for the case that
549                 # we're talking to a proxy or other backend that
550                 # stores to multiple copies for us.
551                 try:
552                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
553                 except (KeyError, ValueError):
554                     replicas_stored = 1
555                 limiter.save_response(result['body'].strip(), replicas_stored)
556             elif result.get('status_code', None):
557                 _logger.debug("Request fail: PUT %s => %s %s",
558                               self.args['data_hash'],
559                               result['status_code'],
560                               result['body'])
561
562
563     def __init__(self, api_client=None, proxy=None,
564                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
565                  api_token=None, local_store=None, block_cache=None,
566                  num_retries=0, session=None):
567         """Initialize a new KeepClient.
568
569         Arguments:
570         :api_client:
571           The API client to use to find Keep services.  If not
572           provided, KeepClient will build one from available Arvados
573           configuration.
574
575         :proxy:
576           If specified, this KeepClient will send requests to this Keep
577           proxy.  Otherwise, KeepClient will fall back to the setting of the
578           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
579           KeepClient does not use a proxy, pass in an empty string.
580
581         :timeout:
582           The initial timeout (in seconds) for HTTP requests to Keep
583           non-proxy servers.  A tuple of two floats is interpreted as
584           (connection_timeout, read_timeout): see
585           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
586           Because timeouts are often a result of transient server load, the
587           actual connection timeout will be increased by a factor of two on
588           each retry.
589           Default: (2, 300).
590
591         :proxy_timeout:
592           The initial timeout (in seconds) for HTTP requests to
593           Keep proxies. A tuple of two floats is interpreted as
594           (connection_timeout, read_timeout). The behavior described
595           above for adjusting connection timeouts on retry also applies.
596           Default: (20, 300).
597
598         :api_token:
599           If you're not using an API client, but only talking
600           directly to a Keep proxy, this parameter specifies an API token
601           to authenticate Keep requests.  It is an error to specify both
602           api_client and api_token.  If you specify neither, KeepClient
603           will use one available from the Arvados configuration.
604
605         :local_store:
606           If specified, this KeepClient will bypass Keep
607           services, and save data to the named directory.  If unspecified,
608           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
609           environment variable.  If you want to ensure KeepClient does not
610           use local storage, pass in an empty string.  This is primarily
611           intended to mock a server for testing.
612
613         :num_retries:
614           The default number of times to retry failed requests.
615           This will be used as the default num_retries value when get() and
616           put() are called.  Default 0.
617         """
618         self.lock = threading.Lock()
619         if proxy is None:
620             proxy = config.get('ARVADOS_KEEP_PROXY')
621         if api_token is None:
622             if api_client is None:
623                 api_token = config.get('ARVADOS_API_TOKEN')
624             else:
625                 api_token = api_client.api_token
626         elif api_client is not None:
627             raise ValueError(
628                 "can't build KeepClient with both API client and token")
629         if local_store is None:
630             local_store = os.environ.get('KEEP_LOCAL_STORE')
631
632         self.block_cache = block_cache if block_cache else KeepBlockCache()
633         self.timeout = timeout
634         self.proxy_timeout = proxy_timeout
635         self._user_agent_pool = Queue.LifoQueue()
636
637         if local_store:
638             self.local_store = local_store
639             self.get = self.local_store_get
640             self.put = self.local_store_put
641         else:
642             self.num_retries = num_retries
643             if proxy:
644                 if not proxy.endswith('/'):
645                     proxy += '/'
646                 self.api_token = api_token
647                 self._gateway_services = {}
648                 self._keep_services = [{
649                     'uuid': 'proxy',
650                     '_service_root': proxy,
651                     }]
652                 self._writable_services = self._keep_services
653                 self.using_proxy = True
654                 self._static_services_list = True
655                 self.thread_count = None
656             else:
657                 # It's important to avoid instantiating an API client
658                 # unless we actually need one, for testing's sake.
659                 if api_client is None:
660                     api_client = arvados.api('v1')
661                 self.api_client = api_client
662                 self.api_token = api_client.api_token
663                 self._gateway_services = {}
664                 self._keep_services = None
665                 self._writable_services = None
666                 self.using_proxy = None
667                 self._static_services_list = False
668                 self.thread_count = None
669
670     def current_timeout(self, attempt_number):
671         """Return the appropriate timeout to use for this client.
672
673         The proxy timeout setting if the backend service is currently a proxy,
674         the regular timeout setting otherwise.  The `attempt_number` indicates
675         how many times the operation has been tried already (starting from 0
676         for the first try), and scales the connection timeout portion of the
677         return value accordingly.
678
679         """
680         # TODO(twp): the timeout should be a property of a
681         # KeepService, not a KeepClient. See #4488.
682         t = self.proxy_timeout if self.using_proxy else self.timeout
683         return (t[0] * (1 << attempt_number), t[1])
684
685     def build_services_list(self, force_rebuild=False):
686         if (self._static_services_list or
687               (self._keep_services and not force_rebuild)):
688             return
689         with self.lock:
690             try:
691                 keep_services = self.api_client.keep_services().accessible()
692             except Exception:  # API server predates Keep services.
693                 keep_services = self.api_client.keep_disks().list()
694
695             accessible = keep_services.execute().get('items')
696             if not accessible:
697                 raise arvados.errors.NoKeepServersError()
698
699             # Precompute the base URI for each service.
700             for r in accessible:
701                 host = r['service_host']
702                 if not host.startswith('[') and host.find(':') >= 0:
703                     # IPv6 URIs must be formatted like http://[::1]:80/...
704                     host = '[' + host + ']'
705                 r['_service_root'] = "{}://{}:{:d}/".format(
706                     'https' if r['service_ssl_flag'] else 'http',
707                     host,
708                     r['service_port'])
709
710             # Gateway services are only used when specified by UUID,
711             # so there's nothing to gain by filtering them by
712             # service_type.
713             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
714             _logger.debug(str(self._gateway_services))
715
716             self._keep_services = [
717                 ks for ks in accessible
718                 if ks.get('service_type') in ['disk', 'proxy']]
719             self._writable_services = [
720                 ks for ks in accessible
721                 if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
722             _logger.debug(str(self._keep_services))
723
724             self.using_proxy = any(ks.get('service_type') == 'proxy'
725                                    for ks in self._keep_services)
726             # Use a thread_count of 1 if the service is not a disk
727             for ks in accessible:
728                 if ('disk' != ks.get('service_type')) and (True != ks.get('read_only')):
729                     self.thread_count = 1
730
731     def _service_weight(self, data_hash, service_uuid):
732         """Compute the weight of a Keep service endpoint for a data
733         block with a known hash.
734
735         The weight is md5(h + u) where u is the last 15 characters of
736         the service endpoint's UUID.
737         """
738         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
739
740     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
741         """Return an array of Keep service endpoints, in the order in
742         which they should be probed when reading or writing data with
743         the given hash+hints.
744         """
745         self.build_services_list(force_rebuild)
746
747         sorted_roots = []
748
749         # Use the services indicated by the given hints that are
750         # not size or authorization hints.
751         # If it is a K@ hint of size 7, it is a keepproxy
752         # Otherwise, expect the hint to be of len 29 and a uuid
753         # of a remote service that can be resolved to a URI.
754         for hint in locator.hints:
755             if not hint.startswith('A') and not hint[0].isdigit():
756                 if len(hint) == 7 and hint.startswith('K@'):
757                     sorted_roots.append(
758                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
759                 elif len(hint) == 29 and re.match(util.uuid_pattern, hint[2:]):
760                     svc = self._gateway_services.get(hint[2:])
761                     if svc:
762                         sorted_roots.append(svc['_service_root'])
763
764         # Sort the available local services by weight (heaviest first)
765         # for this locator, and return their service_roots (base URIs)
766         # in that order.
767         use_services = self._keep_services
768         if need_writable:
769           use_services = self._writable_services
770         sorted_roots.extend([
771             svc['_service_root'] for svc in sorted(
772                 use_services,
773                 reverse=True,
774                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
775         _logger.debug("{}: {}".format(locator, sorted_roots))
776         return sorted_roots
777
778     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
779         # roots_map is a dictionary, mapping Keep service root strings
780         # to KeepService objects.  Poll for Keep services, and add any
781         # new ones to roots_map.  Return the current list of local
782         # root strings.
783         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
784         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
785         for root in local_roots:
786             if root not in roots_map:
787                 roots_map[root] = self.KeepService(
788                     root, self._user_agent_pool, **headers)
789         return local_roots
790
791     @staticmethod
792     def _check_loop_result(result):
793         # KeepClient RetryLoops should save results as a 2-tuple: the
794         # actual result of the request, and the number of servers available
795         # to receive the request this round.
796         # This method returns True if there's a real result, False if
797         # there are no more servers available, otherwise None.
798         if isinstance(result, Exception):
799             return None
800         result, tried_server_count = result
801         if (result is not None) and (result is not False):
802             return True
803         elif tried_server_count < 1:
804             _logger.info("No more Keep services to try; giving up")
805             return False
806         else:
807             return None
808
809     def get_from_cache(self, loc):
810         """Fetch a block only if is in the cache, otherwise return None."""
811         slot = self.block_cache.get(loc)
812         if slot is not None and slot.ready.is_set():
813             return slot.get()
814         else:
815             return None
816
817     @retry.retry_method
818     def get(self, loc_s, num_retries=None):
819         """Get data from Keep.
820
821         This method fetches one or more blocks of data from Keep.  It
822         sends a request each Keep service registered with the API
823         server (or the proxy provided when this client was
824         instantiated), then each service named in location hints, in
825         sequence.  As soon as one service provides the data, it's
826         returned.
827
828         Arguments:
829         * loc_s: A string of one or more comma-separated locators to fetch.
830           This method returns the concatenation of these blocks.
831         * num_retries: The number of times to retry GET requests to
832           *each* Keep server if it returns temporary failures, with
833           exponential backoff.  Note that, in each loop, the method may try
834           to fetch data from every available Keep service, along with any
835           that are named in location hints in the locator.  The default value
836           is set when the KeepClient is initialized.
837         """
838         if ',' in loc_s:
839             return ''.join(self.get(x) for x in loc_s.split(','))
840         locator = KeepLocator(loc_s)
841         slot, first = self.block_cache.reserve_cache(locator.md5sum)
842         if not first:
843             v = slot.get()
844             return v
845
846         # If the locator has hints specifying a prefix (indicating a
847         # remote keepproxy) or the UUID of a local gateway service,
848         # read data from the indicated service(s) instead of the usual
849         # list of local disk services.
850         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
851                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
852         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
853                            for hint in locator.hints if (
854                                    hint.startswith('K@') and
855                                    len(hint) == 29 and
856                                    self._gateway_services.get(hint[2:])
857                                    )])
858         # Map root URLs to their KeepService objects.
859         roots_map = {
860             root: self.KeepService(root, self._user_agent_pool)
861             for root in hint_roots
862         }
863
864         # See #3147 for a discussion of the loop implementation.  Highlights:
865         # * Refresh the list of Keep services after each failure, in case
866         #   it's being updated.
867         # * Retry until we succeed, we're out of retries, or every available
868         #   service has returned permanent failure.
869         sorted_roots = []
870         roots_map = {}
871         blob = None
872         loop = retry.RetryLoop(num_retries, self._check_loop_result,
873                                backoff_start=2)
874         for tries_left in loop:
875             try:
876                 sorted_roots = self.map_new_services(
877                     roots_map, locator,
878                     force_rebuild=(tries_left < num_retries),
879                     need_writable=False)
880             except Exception as error:
881                 loop.save_result(error)
882                 continue
883
884             # Query KeepService objects that haven't returned
885             # permanent failure, in our specified shuffle order.
886             services_to_try = [roots_map[root]
887                                for root in sorted_roots
888                                if roots_map[root].usable()]
889             for keep_service in services_to_try:
890                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
891                 if blob is not None:
892                     break
893             loop.save_result((blob, len(services_to_try)))
894
895         # Always cache the result, then return it if we succeeded.
896         slot.set(blob)
897         self.block_cache.cap_cache()
898         if loop.success():
899             return blob
900
901         # Q: Including 403 is necessary for the Keep tests to continue
902         # passing, but maybe they should expect KeepReadError instead?
903         not_founds = sum(1 for key in sorted_roots
904                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
905         service_errors = ((key, roots_map[key].last_result()['error'])
906                           for key in sorted_roots)
907         if not roots_map:
908             raise arvados.errors.KeepReadError(
909                 "failed to read {}: no Keep services available ({})".format(
910                     loc_s, loop.last_result()))
911         elif not_founds == len(sorted_roots):
912             raise arvados.errors.NotFoundError(
913                 "{} not found".format(loc_s), service_errors)
914         else:
915             raise arvados.errors.KeepReadError(
916                 "failed to read {}".format(loc_s), service_errors, label="service")
917
918     @retry.retry_method
919     def put(self, data, copies=2, num_retries=None):
920         """Save data in Keep.
921
922         This method will get a list of Keep services from the API server, and
923         send the data to each one simultaneously in a new thread.  Once the
924         uploads are finished, if enough copies are saved, this method returns
925         the most recent HTTP response body.  If requests fail to upload
926         enough copies, this method raises KeepWriteError.
927
928         Arguments:
929         * data: The string of data to upload.
930         * copies: The number of copies that the user requires be saved.
931           Default 2.
932         * num_retries: The number of times to retry PUT requests to
933           *each* Keep server if it returns temporary failures, with
934           exponential backoff.  The default value is set when the
935           KeepClient is initialized.
936         """
937
938         if isinstance(data, unicode):
939             data = data.encode("ascii")
940         elif not isinstance(data, str):
941             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
942
943         data_hash = hashlib.md5(data).hexdigest()
944         loc_s = data_hash + '+' + str(len(data))
945         if copies < 1:
946             return loc_s
947         locator = KeepLocator(loc_s)
948
949         headers = {}
950         # Tell the proxy how many copies we want it to store
951         headers['X-Keep-Desired-Replication'] = str(copies)
952         roots_map = {}
953         thread_limiter = KeepClient.ThreadLimiter(1 if 1 == self.thread_count else copies)
954         loop = retry.RetryLoop(num_retries, self._check_loop_result,
955                                backoff_start=2)
956         for tries_left in loop:
957             try:
958                 local_roots = self.map_new_services(
959                     roots_map, locator,
960                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
961             except Exception as error:
962                 loop.save_result(error)
963                 continue
964
965             threads = []
966             for service_root, ks in roots_map.iteritems():
967                 if ks.finished():
968                     continue
969                 t = KeepClient.KeepWriterThread(
970                     ks,
971                     data=data,
972                     data_hash=data_hash,
973                     service_root=service_root,
974                     thread_limiter=thread_limiter,
975                     timeout=self.current_timeout(num_retries-tries_left))
976                 t.start()
977                 threads.append(t)
978             for t in threads:
979                 t.join()
980             loop.save_result((thread_limiter.done() >= copies, len(threads)))
981
982         if loop.success():
983             return thread_limiter.response()
984         if not roots_map:
985             raise arvados.errors.KeepWriteError(
986                 "failed to write {}: no Keep services available ({})".format(
987                     data_hash, loop.last_result()))
988         else:
989             service_errors = ((key, roots_map[key].last_result()['error'])
990                               for key in local_roots
991                               if roots_map[key].last_result()['error'])
992             raise arvados.errors.KeepWriteError(
993                 "failed to write {} (wanted {} copies but wrote {})".format(
994                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
995
996     def local_store_put(self, data, copies=1, num_retries=None):
997         """A stub for put().
998
999         This method is used in place of the real put() method when
1000         using local storage (see constructor's local_store argument).
1001
1002         copies and num_retries arguments are ignored: they are here
1003         only for the sake of offering the same call signature as
1004         put().
1005
1006         Data stored this way can be retrieved via local_store_get().
1007         """
1008         md5 = hashlib.md5(data).hexdigest()
1009         locator = '%s+%d' % (md5, len(data))
1010         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1011             f.write(data)
1012         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1013                   os.path.join(self.local_store, md5))
1014         return locator
1015
1016     def local_store_get(self, loc_s, num_retries=None):
1017         """Companion to local_store_put()."""
1018         try:
1019             locator = KeepLocator(loc_s)
1020         except ValueError:
1021             raise arvados.errors.NotFoundError(
1022                 "Invalid data locator: '%s'" % loc_s)
1023         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1024             return ''
1025         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1026             return f.read()
1027
1028     def is_cached(self, locator):
1029         return self.block_cache.reserve_cache(expect_hash)