Merge branch '5856-read-exact' closes #5856
[arvados.git] / sdk / python / arvados / keep.py
1 import bz2
2 import datetime
3 import fcntl
4 import functools
5 import gflags
6 import hashlib
7 import json
8 import logging
9 import os
10 import pprint
11 import pycurl
12 import Queue
13 import re
14 import socket
15 import ssl
16 import string
17 import StringIO
18 import subprocess
19 import sys
20 import threading
21 import time
22 import timer
23 import types
24 import UserDict
25 import zlib
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.retry as retry
31 import arvados.util
32
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
35
36
37 class KeepLocator(object):
38     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
40
41     def __init__(self, locator_str):
42         self.hints = []
43         self._perm_sig = None
44         self._perm_expiry = None
45         pieces = iter(locator_str.split('+'))
46         self.md5sum = next(pieces)
47         try:
48             self.size = int(next(pieces))
49         except StopIteration:
50             self.size = None
51         for hint in pieces:
52             if self.HINT_RE.match(hint) is None:
53                 raise ValueError("invalid hint format: {}".format(hint))
54             elif hint.startswith('A'):
55                 self.parse_permission_hint(hint)
56             else:
57                 self.hints.append(hint)
58
59     def __str__(self):
60         return '+'.join(
61             str(s) for s in [self.md5sum, self.size,
62                              self.permission_hint()] + self.hints
63             if s is not None)
64
65     def stripped(self):
66         if self.size is not None:
67             return "%s+%i" % (self.md5sum, self.size)
68         else:
69             return self.md5sum
70
71     def _make_hex_prop(name, length):
72         # Build and return a new property with the given name that
73         # must be a hex string of the given length.
74         data_name = '_{}'.format(name)
75         def getter(self):
76             return getattr(self, data_name)
77         def setter(self, hex_str):
78             if not arvados.util.is_hex(hex_str, length):
79                 raise ValueError("{} must be a {}-digit hex string: {}".
80                                  format(name, length, hex_str))
81             setattr(self, data_name, hex_str)
82         return property(getter, setter)
83
84     md5sum = _make_hex_prop('md5sum', 32)
85     perm_sig = _make_hex_prop('perm_sig', 40)
86
87     @property
88     def perm_expiry(self):
89         return self._perm_expiry
90
91     @perm_expiry.setter
92     def perm_expiry(self, value):
93         if not arvados.util.is_hex(value, 1, 8):
94             raise ValueError(
95                 "permission timestamp must be a hex Unix timestamp: {}".
96                 format(value))
97         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
98
99     def permission_hint(self):
100         data = [self.perm_sig, self.perm_expiry]
101         if None in data:
102             return None
103         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104         return "A{}@{:08x}".format(*data)
105
106     def parse_permission_hint(self, s):
107         try:
108             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
109         except IndexError:
110             raise ValueError("bad permission hint {}".format(s))
111
112     def permission_expired(self, as_of_dt=None):
113         if self.perm_expiry is None:
114             return False
115         elif as_of_dt is None:
116             as_of_dt = datetime.datetime.now()
117         return self.perm_expiry <= as_of_dt
118
119
120 class Keep(object):
121     """Simple interface to a global KeepClient object.
122
123     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
124     own API client.  The global KeepClient will build an API client from the
125     current Arvados configuration, which may not match the one you built.
126     """
127     _last_key = None
128
129     @classmethod
130     def global_client_object(cls):
131         global global_client_object
132         # Previously, KeepClient would change its behavior at runtime based
133         # on these configuration settings.  We simulate that behavior here
134         # by checking the values and returning a new KeepClient if any of
135         # them have changed.
136         key = (config.get('ARVADOS_API_HOST'),
137                config.get('ARVADOS_API_TOKEN'),
138                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139                config.get('ARVADOS_KEEP_PROXY'),
140                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141                os.environ.get('KEEP_LOCAL_STORE'))
142         if (global_client_object is None) or (cls._last_key != key):
143             global_client_object = KeepClient()
144             cls._last_key = key
145         return global_client_object
146
147     @staticmethod
148     def get(locator, **kwargs):
149         return Keep.global_client_object().get(locator, **kwargs)
150
151     @staticmethod
152     def put(data, **kwargs):
153         return Keep.global_client_object().put(data, **kwargs)
154
155 class KeepBlockCache(object):
156     # Default RAM cache is 256MiB
157     def __init__(self, cache_max=(256 * 1024 * 1024)):
158         self.cache_max = cache_max
159         self._cache = []
160         self._cache_lock = threading.Lock()
161
162     class CacheSlot(object):
163         def __init__(self, locator):
164             self.locator = locator
165             self.ready = threading.Event()
166             self.content = None
167
168         def get(self):
169             self.ready.wait()
170             return self.content
171
172         def set(self, value):
173             self.content = value
174             self.ready.set()
175
176         def size(self):
177             if self.content is None:
178                 return 0
179             else:
180                 return len(self.content)
181
182     def cap_cache(self):
183         '''Cap the cache size to self.cache_max'''
184         with self._cache_lock:
185             # Select all slots except those where ready.is_set() and content is
186             # None (that means there was an error reading the block).
187             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188             sm = sum([slot.size() for slot in self._cache])
189             while len(self._cache) > 0 and sm > self.cache_max:
190                 for i in xrange(len(self._cache)-1, -1, -1):
191                     if self._cache[i].ready.is_set():
192                         del self._cache[i]
193                         break
194                 sm = sum([slot.size() for slot in self._cache])
195
196     def _get(self, locator):
197         # Test if the locator is already in the cache
198         for i in xrange(0, len(self._cache)):
199             if self._cache[i].locator == locator:
200                 n = self._cache[i]
201                 if i != 0:
202                     # move it to the front
203                     del self._cache[i]
204                     self._cache.insert(0, n)
205                 return n
206         return None
207
208     def get(self, locator):
209         with self._cache_lock:
210             return self._get(locator)
211
212     def reserve_cache(self, locator):
213         '''Reserve a cache slot for the specified locator,
214         or return the existing slot.'''
215         with self._cache_lock:
216             n = self._get(locator)
217             if n:
218                 return n, False
219             else:
220                 # Add a new cache slot for the locator
221                 n = KeepBlockCache.CacheSlot(locator)
222                 self._cache.insert(0, n)
223                 return n, True
224
225 class KeepClient(object):
226
227     # Default Keep server connection timeout:  2 seconds
228     # Default Keep server read timeout:      300 seconds
229     # Default Keep proxy connection timeout:  20 seconds
230     # Default Keep proxy read timeout:       300 seconds
231     DEFAULT_TIMEOUT = (2, 300)
232     DEFAULT_PROXY_TIMEOUT = (20, 300)
233
234     class ThreadLimiter(object):
235         """
236         Limit the number of threads running at a given time to
237         {desired successes} minus {successes reported}. When successes
238         reported == desired, wake up the remaining threads and tell
239         them to quit.
240
241         Should be used in a "with" block.
242         """
243         def __init__(self, todo):
244             self._todo = todo
245             self._done = 0
246             self._response = None
247             self._todo_lock = threading.Semaphore(todo)
248             self._done_lock = threading.Lock()
249
250         def __enter__(self):
251             self._todo_lock.acquire()
252             return self
253
254         def __exit__(self, type, value, traceback):
255             self._todo_lock.release()
256
257         def shall_i_proceed(self):
258             """
259             Return true if the current thread should do stuff. Return
260             false if the current thread should just stop.
261             """
262             with self._done_lock:
263                 return (self._done < self._todo)
264
265         def save_response(self, response_body, replicas_stored):
266             """
267             Records a response body (a locator, possibly signed) returned by
268             the Keep server.  It is not necessary to save more than
269             one response, since we presume that any locator returned
270             in response to a successful request is valid.
271             """
272             with self._done_lock:
273                 self._done += replicas_stored
274                 self._response = response_body
275
276         def response(self):
277             """
278             Returns the body from the response to a PUT request.
279             """
280             with self._done_lock:
281                 return self._response
282
283         def done(self):
284             """
285             Return how many successes were reported.
286             """
287             with self._done_lock:
288                 return self._done
289
290
291     class KeepService(object):
292         """Make requests to a single Keep service, and track results.
293
294         A KeepService is intended to last long enough to perform one
295         transaction (GET or PUT) against one Keep service. This can
296         involve calling either get() or put() multiple times in order
297         to retry after transient failures. However, calling both get()
298         and put() on a single instance -- or using the same instance
299         to access two different Keep services -- will not produce
300         sensible behavior.
301         """
302
303         HTTP_ERRORS = (
304             socket.error,
305             ssl.SSLError,
306             arvados.errors.HttpError,
307         )
308
309         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
310             self.root = root
311             self._user_agent_pool = user_agent_pool
312             self._result = {'error': None}
313             self._usable = True
314             self._session = None
315             self.get_headers = {'Accept': 'application/octet-stream'}
316             self.get_headers.update(headers)
317             self.put_headers = headers
318
319         def usable(self):
320             """Is it worth attempting a request?"""
321             return self._usable
322
323         def finished(self):
324             """Did the request succeed or encounter permanent failure?"""
325             return self._result['error'] == False or not self._usable
326
327         def last_result(self):
328             return self._result
329
330         def _get_user_agent(self):
331             try:
332                 return self._user_agent_pool.get(False)
333             except Queue.Empty:
334                 return pycurl.Curl()
335
336         def _put_user_agent(self, ua):
337             try:
338                 ua.reset()
339                 self._user_agent_pool.put(ua, False)
340             except:
341                 ua.close()
342
343         @staticmethod
344         def _socket_open(family, socktype, protocol, address=None):
345             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
346             s = socket.socket(family, socktype, protocol)
347             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
348             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
349             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
350             return s
351
352         def get(self, locator, timeout=None):
353             # locator is a KeepLocator object.
354             url = self.root + str(locator)
355             _logger.debug("Request: GET %s", url)
356             curl = self._get_user_agent()
357             try:
358                 with timer.Timer() as t:
359                     self._headers = {}
360                     response_body = StringIO.StringIO()
361                     curl.setopt(pycurl.NOSIGNAL, 1)
362                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
363                     curl.setopt(pycurl.URL, url.encode('utf-8'))
364                     curl.setopt(pycurl.HTTPHEADER, [
365                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
366                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
367                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
368                     self._setcurltimeouts(curl, timeout)
369                     try:
370                         curl.perform()
371                     except Exception as e:
372                         raise arvados.errors.HttpError(0, str(e))
373                     self._result = {
374                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
375                         'body': response_body.getvalue(),
376                         'headers': self._headers,
377                         'error': False,
378                     }
379                 ok = retry.check_http_response_success(self._result['status_code'])
380                 if not ok:
381                     self._result['error'] = arvados.errors.HttpError(
382                         self._result['status_code'],
383                         self._headers.get('x-status-line', 'Error'))
384             except self.HTTP_ERRORS as e:
385                 self._result = {
386                     'error': e,
387                 }
388                 ok = False
389             self._usable = ok != False
390             if self._result.get('status_code', None):
391                 # The client worked well enough to get an HTTP status
392                 # code, so presumably any problems are just on the
393                 # server side and it's OK to reuse the client.
394                 self._put_user_agent(curl)
395             else:
396                 # Don't return this client to the pool, in case it's
397                 # broken.
398                 curl.close()
399             if not ok:
400                 _logger.debug("Request fail: GET %s => %s: %s",
401                               url, type(self._result['error']), str(self._result['error']))
402                 return None
403             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
404                          self._result['status_code'],
405                          len(self._result['body']),
406                          t.msecs,
407                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
408             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
409             if resp_md5 != locator.md5sum:
410                 _logger.warning("Checksum fail: md5(%s) = %s",
411                                 url, resp_md5)
412                 self._result['error'] = arvados.errors.HttpError(
413                     0, 'Checksum fail')
414                 return None
415             return self._result['body']
416
417         def put(self, hash_s, body, timeout=None):
418             url = self.root + hash_s
419             _logger.debug("Request: PUT %s", url)
420             curl = self._get_user_agent()
421             try:
422                 self._headers = {}
423                 response_body = StringIO.StringIO()
424                 curl.setopt(pycurl.NOSIGNAL, 1)
425                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
426                 curl.setopt(pycurl.URL, url.encode('utf-8'))
427                 curl.setopt(pycurl.POSTFIELDS, body)
428                 curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
429                 curl.setopt(pycurl.HTTPHEADER, [
430                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
431                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
432                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
433                 self._setcurltimeouts(curl, timeout)
434                 try:
435                     curl.perform()
436                 except Exception as e:
437                     raise arvados.errors.HttpError(0, str(e))
438                 self._result = {
439                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
440                     'body': response_body.getvalue(),
441                     'headers': self._headers,
442                     'error': False,
443                 }
444                 ok = retry.check_http_response_success(self._result['status_code'])
445                 if not ok:
446                     self._result['error'] = arvados.errors.HttpError(
447                         self._result['status_code'],
448                         self._headers.get('x-status-line', 'Error'))
449             except self.HTTP_ERRORS as e:
450                 self._result = {
451                     'error': e,
452                 }
453                 ok = False
454             self._usable = ok != False # still usable if ok is True or None
455             if self._result.get('status_code', None):
456                 # Client is functional. See comment in get().
457                 self._put_user_agent(curl)
458             else:
459                 curl.close()
460             if not ok:
461                 _logger.debug("Request fail: PUT %s => %s: %s",
462                               url, type(self._result['error']), str(self._result['error']))
463                 return False
464             return True
465
466         def _setcurltimeouts(self, curl, timeouts):
467             if not timeouts:
468                 return
469             elif isinstance(timeouts, tuple):
470                 conn_t, xfer_t = timeouts
471             else:
472                 conn_t, xfer_t = (timeouts, timeouts)
473             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
474             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
475
476         def _headerfunction(self, header_line):
477             header_line = header_line.decode('iso-8859-1')
478             if ':' in header_line:
479                 name, value = header_line.split(':', 1)
480                 name = name.strip().lower()
481                 value = value.strip()
482             elif self._headers:
483                 name = self._lastheadername
484                 value = self._headers[name] + ' ' + header_line.strip()
485             elif header_line.startswith('HTTP/'):
486                 name = 'x-status-line'
487                 value = header_line
488             else:
489                 _logger.error("Unexpected header line: %s", header_line)
490                 return
491             self._lastheadername = name
492             self._headers[name] = value
493             # Returning None implies all bytes were written
494
495
496     class KeepWriterThread(threading.Thread):
497         """
498         Write a blob of data to the given Keep server. On success, call
499         save_response() of the given ThreadLimiter to save the returned
500         locator.
501         """
502         def __init__(self, keep_service, **kwargs):
503             super(KeepClient.KeepWriterThread, self).__init__()
504             self.service = keep_service
505             self.args = kwargs
506             self._success = False
507
508         def success(self):
509             return self._success
510
511         def run(self):
512             with self.args['thread_limiter'] as limiter:
513                 if not limiter.shall_i_proceed():
514                     # My turn arrived, but the job has been done without
515                     # me.
516                     return
517                 self.run_with_limiter(limiter)
518
519         def run_with_limiter(self, limiter):
520             if self.service.finished():
521                 return
522             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
523                           str(threading.current_thread()),
524                           self.args['data_hash'],
525                           len(self.args['data']),
526                           self.args['service_root'])
527             self._success = bool(self.service.put(
528                 self.args['data_hash'],
529                 self.args['data'],
530                 timeout=self.args.get('timeout', None)))
531             result = self.service.last_result()
532             if self._success:
533                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
534                               str(threading.current_thread()),
535                               self.args['data_hash'],
536                               len(self.args['data']),
537                               self.args['service_root'])
538                 # Tick the 'done' counter for the number of replica
539                 # reported stored by the server, for the case that
540                 # we're talking to a proxy or other backend that
541                 # stores to multiple copies for us.
542                 try:
543                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
544                 except (KeyError, ValueError):
545                     replicas_stored = 1
546                 limiter.save_response(result['body'].strip(), replicas_stored)
547             elif result.get('status_code', None):
548                 _logger.debug("Request fail: PUT %s => %s %s",
549                               self.args['data_hash'],
550                               result['status_code'],
551                               result['body'])
552
553
554     def __init__(self, api_client=None, proxy=None,
555                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
556                  api_token=None, local_store=None, block_cache=None,
557                  num_retries=0, session=None):
558         """Initialize a new KeepClient.
559
560         Arguments:
561         :api_client:
562           The API client to use to find Keep services.  If not
563           provided, KeepClient will build one from available Arvados
564           configuration.
565
566         :proxy:
567           If specified, this KeepClient will send requests to this Keep
568           proxy.  Otherwise, KeepClient will fall back to the setting of the
569           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
570           KeepClient does not use a proxy, pass in an empty string.
571
572         :timeout:
573           The initial timeout (in seconds) for HTTP requests to Keep
574           non-proxy servers.  A tuple of two floats is interpreted as
575           (connection_timeout, read_timeout): see
576           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
577           Because timeouts are often a result of transient server load, the
578           actual connection timeout will be increased by a factor of two on
579           each retry.
580           Default: (2, 300).
581
582         :proxy_timeout:
583           The initial timeout (in seconds) for HTTP requests to
584           Keep proxies. A tuple of two floats is interpreted as
585           (connection_timeout, read_timeout). The behavior described
586           above for adjusting connection timeouts on retry also applies.
587           Default: (20, 300).
588
589         :api_token:
590           If you're not using an API client, but only talking
591           directly to a Keep proxy, this parameter specifies an API token
592           to authenticate Keep requests.  It is an error to specify both
593           api_client and api_token.  If you specify neither, KeepClient
594           will use one available from the Arvados configuration.
595
596         :local_store:
597           If specified, this KeepClient will bypass Keep
598           services, and save data to the named directory.  If unspecified,
599           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
600           environment variable.  If you want to ensure KeepClient does not
601           use local storage, pass in an empty string.  This is primarily
602           intended to mock a server for testing.
603
604         :num_retries:
605           The default number of times to retry failed requests.
606           This will be used as the default num_retries value when get() and
607           put() are called.  Default 0.
608         """
609         self.lock = threading.Lock()
610         if proxy is None:
611             proxy = config.get('ARVADOS_KEEP_PROXY')
612         if api_token is None:
613             if api_client is None:
614                 api_token = config.get('ARVADOS_API_TOKEN')
615             else:
616                 api_token = api_client.api_token
617         elif api_client is not None:
618             raise ValueError(
619                 "can't build KeepClient with both API client and token")
620         if local_store is None:
621             local_store = os.environ.get('KEEP_LOCAL_STORE')
622
623         self.block_cache = block_cache if block_cache else KeepBlockCache()
624         self.timeout = timeout
625         self.proxy_timeout = proxy_timeout
626         self._user_agent_pool = Queue.LifoQueue()
627
628         if local_store:
629             self.local_store = local_store
630             self.get = self.local_store_get
631             self.put = self.local_store_put
632         else:
633             self.num_retries = num_retries
634             if proxy:
635                 if not proxy.endswith('/'):
636                     proxy += '/'
637                 self.api_token = api_token
638                 self._gateway_services = {}
639                 self._keep_services = [{
640                     'uuid': 'proxy',
641                     '_service_root': proxy,
642                     }]
643                 self.using_proxy = True
644                 self._static_services_list = True
645             else:
646                 # It's important to avoid instantiating an API client
647                 # unless we actually need one, for testing's sake.
648                 if api_client is None:
649                     api_client = arvados.api('v1')
650                 self.api_client = api_client
651                 self.api_token = api_client.api_token
652                 self._gateway_services = {}
653                 self._keep_services = None
654                 self.using_proxy = None
655                 self._static_services_list = False
656
657     def current_timeout(self, attempt_number):
658         """Return the appropriate timeout to use for this client.
659
660         The proxy timeout setting if the backend service is currently a proxy,
661         the regular timeout setting otherwise.  The `attempt_number` indicates
662         how many times the operation has been tried already (starting from 0
663         for the first try), and scales the connection timeout portion of the
664         return value accordingly.
665
666         """
667         # TODO(twp): the timeout should be a property of a
668         # KeepService, not a KeepClient. See #4488.
669         t = self.proxy_timeout if self.using_proxy else self.timeout
670         return (t[0] * (1 << attempt_number), t[1])
671
672     def build_services_list(self, force_rebuild=False):
673         if (self._static_services_list or
674               (self._keep_services and not force_rebuild)):
675             return
676         with self.lock:
677             try:
678                 keep_services = self.api_client.keep_services().accessible()
679             except Exception:  # API server predates Keep services.
680                 keep_services = self.api_client.keep_disks().list()
681
682             accessible = keep_services.execute().get('items')
683             if not accessible:
684                 raise arvados.errors.NoKeepServersError()
685
686             # Precompute the base URI for each service.
687             for r in accessible:
688                 host = r['service_host']
689                 if not host.startswith('[') and host.find(':') >= 0:
690                     # IPv6 URIs must be formatted like http://[::1]:80/...
691                     host = '[' + host + ']'
692                 r['_service_root'] = "{}://{}:{:d}/".format(
693                     'https' if r['service_ssl_flag'] else 'http',
694                     host,
695                     r['service_port'])
696
697             # Gateway services are only used when specified by UUID,
698             # so there's nothing to gain by filtering them by
699             # service_type.
700             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
701             _logger.debug(str(self._gateway_services))
702
703             self._keep_services = [
704                 ks for ks in accessible
705                 if ks.get('service_type') in ['disk', 'proxy']]
706             _logger.debug(str(self._keep_services))
707
708             self.using_proxy = any(ks.get('service_type') == 'proxy'
709                                    for ks in self._keep_services)
710
711     def _service_weight(self, data_hash, service_uuid):
712         """Compute the weight of a Keep service endpoint for a data
713         block with a known hash.
714
715         The weight is md5(h + u) where u is the last 15 characters of
716         the service endpoint's UUID.
717         """
718         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
719
720     def weighted_service_roots(self, locator, force_rebuild=False):
721         """Return an array of Keep service endpoints, in the order in
722         which they should be probed when reading or writing data with
723         the given hash+hints.
724         """
725         self.build_services_list(force_rebuild)
726
727         sorted_roots = []
728
729         # Use the services indicated by the given +K@... remote
730         # service hints, if any are present and can be resolved to a
731         # URI.
732         for hint in locator.hints:
733             if hint.startswith('K@'):
734                 if len(hint) == 7:
735                     sorted_roots.append(
736                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
737                 elif len(hint) == 29:
738                     svc = self._gateway_services.get(hint[2:])
739                     if svc:
740                         sorted_roots.append(svc['_service_root'])
741
742         # Sort the available local services by weight (heaviest first)
743         # for this locator, and return their service_roots (base URIs)
744         # in that order.
745         sorted_roots.extend([
746             svc['_service_root'] for svc in sorted(
747                 self._keep_services,
748                 reverse=True,
749                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
750         _logger.debug("{}: {}".format(locator, sorted_roots))
751         return sorted_roots
752
753     def map_new_services(self, roots_map, locator, force_rebuild, **headers):
754         # roots_map is a dictionary, mapping Keep service root strings
755         # to KeepService objects.  Poll for Keep services, and add any
756         # new ones to roots_map.  Return the current list of local
757         # root strings.
758         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
759         local_roots = self.weighted_service_roots(locator, force_rebuild)
760         for root in local_roots:
761             if root not in roots_map:
762                 roots_map[root] = self.KeepService(
763                     root, self._user_agent_pool, **headers)
764         return local_roots
765
766     @staticmethod
767     def _check_loop_result(result):
768         # KeepClient RetryLoops should save results as a 2-tuple: the
769         # actual result of the request, and the number of servers available
770         # to receive the request this round.
771         # This method returns True if there's a real result, False if
772         # there are no more servers available, otherwise None.
773         if isinstance(result, Exception):
774             return None
775         result, tried_server_count = result
776         if (result is not None) and (result is not False):
777             return True
778         elif tried_server_count < 1:
779             _logger.info("No more Keep services to try; giving up")
780             return False
781         else:
782             return None
783
784     def get_from_cache(self, loc):
785         """Fetch a block only if is in the cache, otherwise return None."""
786         slot = self.block_cache.get(loc)
787         if slot is not None and slot.ready.is_set():
788             return slot.get()
789         else:
790             return None
791
792     @retry.retry_method
793     def get(self, loc_s, num_retries=None):
794         """Get data from Keep.
795
796         This method fetches one or more blocks of data from Keep.  It
797         sends a request each Keep service registered with the API
798         server (or the proxy provided when this client was
799         instantiated), then each service named in location hints, in
800         sequence.  As soon as one service provides the data, it's
801         returned.
802
803         Arguments:
804         * loc_s: A string of one or more comma-separated locators to fetch.
805           This method returns the concatenation of these blocks.
806         * num_retries: The number of times to retry GET requests to
807           *each* Keep server if it returns temporary failures, with
808           exponential backoff.  Note that, in each loop, the method may try
809           to fetch data from every available Keep service, along with any
810           that are named in location hints in the locator.  The default value
811           is set when the KeepClient is initialized.
812         """
813         if ',' in loc_s:
814             return ''.join(self.get(x) for x in loc_s.split(','))
815         locator = KeepLocator(loc_s)
816         slot, first = self.block_cache.reserve_cache(locator.md5sum)
817         if not first:
818             v = slot.get()
819             return v
820
821         # If the locator has hints specifying a prefix (indicating a
822         # remote keepproxy) or the UUID of a local gateway service,
823         # read data from the indicated service(s) instead of the usual
824         # list of local disk services.
825         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
826                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
827         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
828                            for hint in locator.hints if (
829                                    hint.startswith('K@') and
830                                    len(hint) == 29 and
831                                    self._gateway_services.get(hint[2:])
832                                    )])
833         # Map root URLs to their KeepService objects.
834         roots_map = {
835             root: self.KeepService(root, self._user_agent_pool)
836             for root in hint_roots
837         }
838
839         # See #3147 for a discussion of the loop implementation.  Highlights:
840         # * Refresh the list of Keep services after each failure, in case
841         #   it's being updated.
842         # * Retry until we succeed, we're out of retries, or every available
843         #   service has returned permanent failure.
844         sorted_roots = []
845         roots_map = {}
846         blob = None
847         loop = retry.RetryLoop(num_retries, self._check_loop_result,
848                                backoff_start=2)
849         for tries_left in loop:
850             try:
851                 sorted_roots = self.map_new_services(
852                     roots_map, locator,
853                     force_rebuild=(tries_left < num_retries))
854             except Exception as error:
855                 loop.save_result(error)
856                 continue
857
858             # Query KeepService objects that haven't returned
859             # permanent failure, in our specified shuffle order.
860             services_to_try = [roots_map[root]
861                                for root in sorted_roots
862                                if roots_map[root].usable()]
863             for keep_service in services_to_try:
864                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
865                 if blob is not None:
866                     break
867             loop.save_result((blob, len(services_to_try)))
868
869         # Always cache the result, then return it if we succeeded.
870         slot.set(blob)
871         self.block_cache.cap_cache()
872         if loop.success():
873             return blob
874
875         # Q: Including 403 is necessary for the Keep tests to continue
876         # passing, but maybe they should expect KeepReadError instead?
877         not_founds = sum(1 for key in sorted_roots
878                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
879         service_errors = ((key, roots_map[key].last_result()['error'])
880                           for key in sorted_roots)
881         if not roots_map:
882             raise arvados.errors.KeepReadError(
883                 "failed to read {}: no Keep services available ({})".format(
884                     loc_s, loop.last_result()))
885         elif not_founds == len(sorted_roots):
886             raise arvados.errors.NotFoundError(
887                 "{} not found".format(loc_s), service_errors)
888         else:
889             raise arvados.errors.KeepReadError(
890                 "failed to read {}".format(loc_s), service_errors, label="service")
891
892     @retry.retry_method
893     def put(self, data, copies=2, num_retries=None):
894         """Save data in Keep.
895
896         This method will get a list of Keep services from the API server, and
897         send the data to each one simultaneously in a new thread.  Once the
898         uploads are finished, if enough copies are saved, this method returns
899         the most recent HTTP response body.  If requests fail to upload
900         enough copies, this method raises KeepWriteError.
901
902         Arguments:
903         * data: The string of data to upload.
904         * copies: The number of copies that the user requires be saved.
905           Default 2.
906         * num_retries: The number of times to retry PUT requests to
907           *each* Keep server if it returns temporary failures, with
908           exponential backoff.  The default value is set when the
909           KeepClient is initialized.
910         """
911
912         if isinstance(data, unicode):
913             data = data.encode("ascii")
914         elif not isinstance(data, str):
915             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
916
917         data_hash = hashlib.md5(data).hexdigest()
918         if copies < 1:
919             return data_hash
920         locator = KeepLocator(data_hash + '+' + str(len(data)))
921
922         headers = {}
923         if self.using_proxy:
924             # Tell the proxy how many copies we want it to store
925             headers['X-Keep-Desired-Replication'] = str(copies)
926         roots_map = {}
927         thread_limiter = KeepClient.ThreadLimiter(copies)
928         loop = retry.RetryLoop(num_retries, self._check_loop_result,
929                                backoff_start=2)
930         for tries_left in loop:
931             try:
932                 local_roots = self.map_new_services(
933                     roots_map, locator,
934                     force_rebuild=(tries_left < num_retries), **headers)
935             except Exception as error:
936                 loop.save_result(error)
937                 continue
938
939             threads = []
940             for service_root, ks in roots_map.iteritems():
941                 if ks.finished():
942                     continue
943                 t = KeepClient.KeepWriterThread(
944                     ks,
945                     data=data,
946                     data_hash=data_hash,
947                     service_root=service_root,
948                     thread_limiter=thread_limiter,
949                     timeout=self.current_timeout(num_retries-tries_left))
950                 t.start()
951                 threads.append(t)
952             for t in threads:
953                 t.join()
954             loop.save_result((thread_limiter.done() >= copies, len(threads)))
955
956         if loop.success():
957             return thread_limiter.response()
958         if not roots_map:
959             raise arvados.errors.KeepWriteError(
960                 "failed to write {}: no Keep services available ({})".format(
961                     data_hash, loop.last_result()))
962         else:
963             service_errors = ((key, roots_map[key].last_result()['error'])
964                               for key in local_roots
965                               if roots_map[key].last_result()['error'])
966             raise arvados.errors.KeepWriteError(
967                 "failed to write {} (wanted {} copies but wrote {})".format(
968                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
969
970     def local_store_put(self, data, copies=1, num_retries=None):
971         """A stub for put().
972
973         This method is used in place of the real put() method when
974         using local storage (see constructor's local_store argument).
975
976         copies and num_retries arguments are ignored: they are here
977         only for the sake of offering the same call signature as
978         put().
979
980         Data stored this way can be retrieved via local_store_get().
981         """
982         md5 = hashlib.md5(data).hexdigest()
983         locator = '%s+%d' % (md5, len(data))
984         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
985             f.write(data)
986         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
987                   os.path.join(self.local_store, md5))
988         return locator
989
990     def local_store_get(self, loc_s, num_retries=None):
991         """Companion to local_store_put()."""
992         try:
993             locator = KeepLocator(loc_s)
994         except ValueError:
995             raise arvados.errors.NotFoundError(
996                 "Invalid data locator: '%s'" % loc_s)
997         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
998             return ''
999         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1000             return f.read()
1001
1002     def is_cached(self, locator):
1003         return self.block_cache.reserve_cache(expect_hash)