5562: Add support for "Expect: 100-Continue" flow for PUT.
[arvados.git] / sdk / python / arvados / keep.py
1 import bz2
2 import datetime
3 import fcntl
4 import functools
5 import gflags
6 import hashlib
7 import json
8 import logging
9 import os
10 import pprint
11 import pycurl
12 import Queue
13 import re
14 import socket
15 import ssl
16 import string
17 import StringIO
18 import subprocess
19 import sys
20 import threading
21 import time
22 import timer
23 import types
24 import UserDict
25 import zlib
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.retry as retry
31 import arvados.util
32
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
35
36
37 class KeepLocator(object):
38     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
40
41     def __init__(self, locator_str):
42         self.hints = []
43         self._perm_sig = None
44         self._perm_expiry = None
45         pieces = iter(locator_str.split('+'))
46         self.md5sum = next(pieces)
47         try:
48             self.size = int(next(pieces))
49         except StopIteration:
50             self.size = None
51         for hint in pieces:
52             if self.HINT_RE.match(hint) is None:
53                 raise ValueError("invalid hint format: {}".format(hint))
54             elif hint.startswith('A'):
55                 self.parse_permission_hint(hint)
56             else:
57                 self.hints.append(hint)
58
59     def __str__(self):
60         return '+'.join(
61             str(s) for s in [self.md5sum, self.size,
62                              self.permission_hint()] + self.hints
63             if s is not None)
64
65     def stripped(self):
66         if self.size is not None:
67             return "%s+%i" % (self.md5sum, self.size)
68         else:
69             return self.md5sum
70
71     def _make_hex_prop(name, length):
72         # Build and return a new property with the given name that
73         # must be a hex string of the given length.
74         data_name = '_{}'.format(name)
75         def getter(self):
76             return getattr(self, data_name)
77         def setter(self, hex_str):
78             if not arvados.util.is_hex(hex_str, length):
79                 raise ValueError("{} must be a {}-digit hex string: {}".
80                                  format(name, length, hex_str))
81             setattr(self, data_name, hex_str)
82         return property(getter, setter)
83
84     md5sum = _make_hex_prop('md5sum', 32)
85     perm_sig = _make_hex_prop('perm_sig', 40)
86
87     @property
88     def perm_expiry(self):
89         return self._perm_expiry
90
91     @perm_expiry.setter
92     def perm_expiry(self, value):
93         if not arvados.util.is_hex(value, 1, 8):
94             raise ValueError(
95                 "permission timestamp must be a hex Unix timestamp: {}".
96                 format(value))
97         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
98
99     def permission_hint(self):
100         data = [self.perm_sig, self.perm_expiry]
101         if None in data:
102             return None
103         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104         return "A{}@{:08x}".format(*data)
105
106     def parse_permission_hint(self, s):
107         try:
108             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
109         except IndexError:
110             raise ValueError("bad permission hint {}".format(s))
111
112     def permission_expired(self, as_of_dt=None):
113         if self.perm_expiry is None:
114             return False
115         elif as_of_dt is None:
116             as_of_dt = datetime.datetime.now()
117         return self.perm_expiry <= as_of_dt
118
119
120 class Keep(object):
121     """Simple interface to a global KeepClient object.
122
123     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
124     own API client.  The global KeepClient will build an API client from the
125     current Arvados configuration, which may not match the one you built.
126     """
127     _last_key = None
128
129     @classmethod
130     def global_client_object(cls):
131         global global_client_object
132         # Previously, KeepClient would change its behavior at runtime based
133         # on these configuration settings.  We simulate that behavior here
134         # by checking the values and returning a new KeepClient if any of
135         # them have changed.
136         key = (config.get('ARVADOS_API_HOST'),
137                config.get('ARVADOS_API_TOKEN'),
138                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139                config.get('ARVADOS_KEEP_PROXY'),
140                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141                os.environ.get('KEEP_LOCAL_STORE'))
142         if (global_client_object is None) or (cls._last_key != key):
143             global_client_object = KeepClient()
144             cls._last_key = key
145         return global_client_object
146
147     @staticmethod
148     def get(locator, **kwargs):
149         return Keep.global_client_object().get(locator, **kwargs)
150
151     @staticmethod
152     def put(data, **kwargs):
153         return Keep.global_client_object().put(data, **kwargs)
154
155 class KeepBlockCache(object):
156     # Default RAM cache is 256MiB
157     def __init__(self, cache_max=(256 * 1024 * 1024)):
158         self.cache_max = cache_max
159         self._cache = []
160         self._cache_lock = threading.Lock()
161
162     class CacheSlot(object):
163         def __init__(self, locator):
164             self.locator = locator
165             self.ready = threading.Event()
166             self.content = None
167
168         def get(self):
169             self.ready.wait()
170             return self.content
171
172         def set(self, value):
173             self.content = value
174             self.ready.set()
175
176         def size(self):
177             if self.content is None:
178                 return 0
179             else:
180                 return len(self.content)
181
182     def cap_cache(self):
183         '''Cap the cache size to self.cache_max'''
184         with self._cache_lock:
185             # Select all slots except those where ready.is_set() and content is
186             # None (that means there was an error reading the block).
187             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188             sm = sum([slot.size() for slot in self._cache])
189             while len(self._cache) > 0 and sm > self.cache_max:
190                 for i in xrange(len(self._cache)-1, -1, -1):
191                     if self._cache[i].ready.is_set():
192                         del self._cache[i]
193                         break
194                 sm = sum([slot.size() for slot in self._cache])
195
196     def _get(self, locator):
197         # Test if the locator is already in the cache
198         for i in xrange(0, len(self._cache)):
199             if self._cache[i].locator == locator:
200                 n = self._cache[i]
201                 if i != 0:
202                     # move it to the front
203                     del self._cache[i]
204                     self._cache.insert(0, n)
205                 return n
206         return None
207
208     def get(self, locator):
209         with self._cache_lock:
210             return self._get(locator)
211
212     def reserve_cache(self, locator):
213         '''Reserve a cache slot for the specified locator,
214         or return the existing slot.'''
215         with self._cache_lock:
216             n = self._get(locator)
217             if n:
218                 return n, False
219             else:
220                 # Add a new cache slot for the locator
221                 n = KeepBlockCache.CacheSlot(locator)
222                 self._cache.insert(0, n)
223                 return n, True
224
225 class KeepClient(object):
226
227     # Default Keep server connection timeout:  2 seconds
228     # Default Keep server read timeout:      300 seconds
229     # Default Keep proxy connection timeout:  20 seconds
230     # Default Keep proxy read timeout:       300 seconds
231     DEFAULT_TIMEOUT = (2, 300)
232     DEFAULT_PROXY_TIMEOUT = (20, 300)
233
234     class ThreadLimiter(object):
235         """
236         Limit the number of threads running at a given time to
237         {desired successes} minus {successes reported}. When successes
238         reported == desired, wake up the remaining threads and tell
239         them to quit.
240
241         Should be used in a "with" block.
242         """
243         def __init__(self, todo):
244             self._todo = todo
245             self._done = 0
246             self._response = None
247             self._todo_lock = threading.Semaphore(todo)
248             self._done_lock = threading.Lock()
249
250         def __enter__(self):
251             self._todo_lock.acquire()
252             return self
253
254         def __exit__(self, type, value, traceback):
255             self._todo_lock.release()
256
257         def shall_i_proceed(self):
258             """
259             Return true if the current thread should do stuff. Return
260             false if the current thread should just stop.
261             """
262             with self._done_lock:
263                 return (self._done < self._todo)
264
265         def save_response(self, response_body, replicas_stored):
266             """
267             Records a response body (a locator, possibly signed) returned by
268             the Keep server.  It is not necessary to save more than
269             one response, since we presume that any locator returned
270             in response to a successful request is valid.
271             """
272             with self._done_lock:
273                 self._done += replicas_stored
274                 self._response = response_body
275
276         def response(self):
277             """
278             Returns the body from the response to a PUT request.
279             """
280             with self._done_lock:
281                 return self._response
282
283         def done(self):
284             """
285             Return how many successes were reported.
286             """
287             with self._done_lock:
288                 return self._done
289
290
291     class KeepService(object):
292         """Make requests to a single Keep service, and track results.
293
294         A KeepService is intended to last long enough to perform one
295         transaction (GET or PUT) against one Keep service. This can
296         involve calling either get() or put() multiple times in order
297         to retry after transient failures. However, calling both get()
298         and put() on a single instance -- or using the same instance
299         to access two different Keep services -- will not produce
300         sensible behavior.
301         """
302
303         HTTP_ERRORS = (
304             socket.error,
305             ssl.SSLError,
306             arvados.errors.HttpError,
307         )
308
309         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
310             self.root = root
311             self._user_agent_pool = user_agent_pool
312             self._result = {'error': None}
313             self._usable = True
314             self._session = None
315             self.get_headers = {'Accept': 'application/octet-stream'}
316             self.get_headers.update(headers)
317             self.put_headers = headers
318
319         def usable(self):
320             """Is it worth attempting a request?"""
321             return self._usable
322
323         def finished(self):
324             """Did the request succeed or encounter permanent failure?"""
325             return self._result['error'] == False or not self._usable
326
327         def last_result(self):
328             return self._result
329
330         def _get_user_agent(self):
331             try:
332                 return self._user_agent_pool.get(False)
333             except Queue.Empty:
334                 return pycurl.Curl()
335
336         def _put_user_agent(self, ua):
337             try:
338                 ua.reset()
339                 self._user_agent_pool.put(ua, False)
340             except:
341                 ua.close()
342
343         @staticmethod
344         def _socket_open(family, socktype, protocol, address=None):
345             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
346             s = socket.socket(family, socktype, protocol)
347             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
348             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
349             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
350             return s
351
352         def get(self, locator, timeout=None):
353             # locator is a KeepLocator object.
354             url = self.root + str(locator)
355             _logger.debug("Request: GET %s", url)
356             curl = self._get_user_agent()
357             try:
358                 with timer.Timer() as t:
359                     self._headers = {}
360                     response_body = StringIO.StringIO()
361                     curl.setopt(pycurl.NOSIGNAL, 1)
362                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
363                     curl.setopt(pycurl.URL, url.encode('utf-8'))
364                     curl.setopt(pycurl.HTTPHEADER, [
365                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
366                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
367                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
368                     self._setcurltimeouts(curl, timeout)
369                     try:
370                         curl.perform()
371                     except Exception as e:
372                         raise arvados.errors.HttpError(0, str(e))
373                     self._result = {
374                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
375                         'body': response_body.getvalue(),
376                         'headers': self._headers,
377                         'error': False,
378                     }
379                 ok = retry.check_http_response_success(self._result['status_code'])
380                 if not ok:
381                     self._result['error'] = arvados.errors.HttpError(
382                         self._result['status_code'],
383                         self._headers.get('x-status-line', 'Error'))
384             except self.HTTP_ERRORS as e:
385                 self._result = {
386                     'error': e,
387                 }
388                 ok = False
389             self._usable = ok != False
390             if self._result.get('status_code', None):
391                 # The client worked well enough to get an HTTP status
392                 # code, so presumably any problems are just on the
393                 # server side and it's OK to reuse the client.
394                 self._put_user_agent(curl)
395             else:
396                 # Don't return this client to the pool, in case it's
397                 # broken.
398                 curl.close()
399             if not ok:
400                 _logger.debug("Request fail: GET %s => %s: %s",
401                               url, type(self._result['error']), str(self._result['error']))
402                 return None
403             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
404                          self._result['status_code'],
405                          len(self._result['body']),
406                          t.msecs,
407                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
408             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
409             if resp_md5 != locator.md5sum:
410                 _logger.warning("Checksum fail: md5(%s) = %s",
411                                 url, resp_md5)
412                 self._result['error'] = arvados.errors.HttpError(
413                     0, 'Checksum fail')
414                 return None
415             return self._result['body']
416
417         def put(self, hash_s, body, timeout=None):
418             url = self.root + hash_s
419             _logger.debug("Request: PUT %s", url)
420             curl = self._get_user_agent()
421             try:
422                 self._headers = {}
423                 body_reader = StringIO.StringIO(body)
424                 response_body = StringIO.StringIO()
425                 curl.setopt(pycurl.NOSIGNAL, 1)
426                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
427                 curl.setopt(pycurl.URL, url.encode('utf-8'))
428                 curl.setopt(pycurl.UPLOAD, True)
429                 curl.setopt(pycurl.INFILESIZE, len(body))
430                 curl.setopt(pycurl.READFUNCTION, body_reader.read)
431                 curl.setopt(pycurl.HTTPHEADER, [
432                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
433                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
434                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
435                 self._setcurltimeouts(curl, timeout)
436                 try:
437                     curl.perform()
438                 except Exception as e:
439                     raise arvados.errors.HttpError(0, str(e))
440                 self._result = {
441                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
442                     'body': response_body.getvalue(),
443                     'headers': self._headers,
444                     'error': False,
445                 }
446                 ok = retry.check_http_response_success(self._result['status_code'])
447                 if not ok:
448                     self._result['error'] = arvados.errors.HttpError(
449                         self._result['status_code'],
450                         self._headers.get('x-status-line', 'Error'))
451             except self.HTTP_ERRORS as e:
452                 self._result = {
453                     'error': e,
454                 }
455                 ok = False
456             self._usable = ok != False # still usable if ok is True or None
457             if self._result.get('status_code', None):
458                 # Client is functional. See comment in get().
459                 self._put_user_agent(curl)
460             else:
461                 curl.close()
462             if not ok:
463                 _logger.debug("Request fail: PUT %s => %s: %s",
464                               url, type(self._result['error']), str(self._result['error']))
465                 return False
466             return True
467
468         def _setcurltimeouts(self, curl, timeouts):
469             if not timeouts:
470                 return
471             elif isinstance(timeouts, tuple):
472                 conn_t, xfer_t = timeouts
473             else:
474                 conn_t, xfer_t = (timeouts, timeouts)
475             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
476             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
477
478         def _headerfunction(self, header_line):
479             header_line = header_line.decode('iso-8859-1')
480             if ':' in header_line:
481                 name, value = header_line.split(':', 1)
482                 name = name.strip().lower()
483                 value = value.strip()
484             elif self._headers:
485                 name = self._lastheadername
486                 value = self._headers[name] + ' ' + header_line.strip()
487             elif header_line.startswith('HTTP/'):
488                 name = 'x-status-line'
489                 value = header_line
490             else:
491                 _logger.error("Unexpected header line: %s", header_line)
492                 return
493             self._lastheadername = name
494             self._headers[name] = value
495             # Returning None implies all bytes were written
496
497
498     class KeepWriterThread(threading.Thread):
499         """
500         Write a blob of data to the given Keep server. On success, call
501         save_response() of the given ThreadLimiter to save the returned
502         locator.
503         """
504         def __init__(self, keep_service, **kwargs):
505             super(KeepClient.KeepWriterThread, self).__init__()
506             self.service = keep_service
507             self.args = kwargs
508             self._success = False
509
510         def success(self):
511             return self._success
512
513         def run(self):
514             with self.args['thread_limiter'] as limiter:
515                 if not limiter.shall_i_proceed():
516                     # My turn arrived, but the job has been done without
517                     # me.
518                     return
519                 self.run_with_limiter(limiter)
520
521         def run_with_limiter(self, limiter):
522             if self.service.finished():
523                 return
524             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
525                           str(threading.current_thread()),
526                           self.args['data_hash'],
527                           len(self.args['data']),
528                           self.args['service_root'])
529             self._success = bool(self.service.put(
530                 self.args['data_hash'],
531                 self.args['data'],
532                 timeout=self.args.get('timeout', None)))
533             result = self.service.last_result()
534             if self._success:
535                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
536                               str(threading.current_thread()),
537                               self.args['data_hash'],
538                               len(self.args['data']),
539                               self.args['service_root'])
540                 # Tick the 'done' counter for the number of replica
541                 # reported stored by the server, for the case that
542                 # we're talking to a proxy or other backend that
543                 # stores to multiple copies for us.
544                 try:
545                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
546                 except (KeyError, ValueError):
547                     replicas_stored = 1
548                 limiter.save_response(result['body'].strip(), replicas_stored)
549             elif result.get('status_code', None):
550                 _logger.debug("Request fail: PUT %s => %s %s",
551                               self.args['data_hash'],
552                               result['status_code'],
553                               result['body'])
554
555
556     def __init__(self, api_client=None, proxy=None,
557                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
558                  api_token=None, local_store=None, block_cache=None,
559                  num_retries=0, session=None):
560         """Initialize a new KeepClient.
561
562         Arguments:
563         :api_client:
564           The API client to use to find Keep services.  If not
565           provided, KeepClient will build one from available Arvados
566           configuration.
567
568         :proxy:
569           If specified, this KeepClient will send requests to this Keep
570           proxy.  Otherwise, KeepClient will fall back to the setting of the
571           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
572           KeepClient does not use a proxy, pass in an empty string.
573
574         :timeout:
575           The initial timeout (in seconds) for HTTP requests to Keep
576           non-proxy servers.  A tuple of two floats is interpreted as
577           (connection_timeout, read_timeout): see
578           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
579           Because timeouts are often a result of transient server load, the
580           actual connection timeout will be increased by a factor of two on
581           each retry.
582           Default: (2, 300).
583
584         :proxy_timeout:
585           The initial timeout (in seconds) for HTTP requests to
586           Keep proxies. A tuple of two floats is interpreted as
587           (connection_timeout, read_timeout). The behavior described
588           above for adjusting connection timeouts on retry also applies.
589           Default: (20, 300).
590
591         :api_token:
592           If you're not using an API client, but only talking
593           directly to a Keep proxy, this parameter specifies an API token
594           to authenticate Keep requests.  It is an error to specify both
595           api_client and api_token.  If you specify neither, KeepClient
596           will use one available from the Arvados configuration.
597
598         :local_store:
599           If specified, this KeepClient will bypass Keep
600           services, and save data to the named directory.  If unspecified,
601           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
602           environment variable.  If you want to ensure KeepClient does not
603           use local storage, pass in an empty string.  This is primarily
604           intended to mock a server for testing.
605
606         :num_retries:
607           The default number of times to retry failed requests.
608           This will be used as the default num_retries value when get() and
609           put() are called.  Default 0.
610         """
611         self.lock = threading.Lock()
612         if proxy is None:
613             proxy = config.get('ARVADOS_KEEP_PROXY')
614         if api_token is None:
615             if api_client is None:
616                 api_token = config.get('ARVADOS_API_TOKEN')
617             else:
618                 api_token = api_client.api_token
619         elif api_client is not None:
620             raise ValueError(
621                 "can't build KeepClient with both API client and token")
622         if local_store is None:
623             local_store = os.environ.get('KEEP_LOCAL_STORE')
624
625         self.block_cache = block_cache if block_cache else KeepBlockCache()
626         self.timeout = timeout
627         self.proxy_timeout = proxy_timeout
628         self._user_agent_pool = Queue.LifoQueue()
629
630         if local_store:
631             self.local_store = local_store
632             self.get = self.local_store_get
633             self.put = self.local_store_put
634         else:
635             self.num_retries = num_retries
636             if proxy:
637                 if not proxy.endswith('/'):
638                     proxy += '/'
639                 self.api_token = api_token
640                 self._gateway_services = {}
641                 self._keep_services = [{
642                     'uuid': 'proxy',
643                     '_service_root': proxy,
644                     }]
645                 self.using_proxy = True
646                 self._static_services_list = True
647             else:
648                 # It's important to avoid instantiating an API client
649                 # unless we actually need one, for testing's sake.
650                 if api_client is None:
651                     api_client = arvados.api('v1')
652                 self.api_client = api_client
653                 self.api_token = api_client.api_token
654                 self._gateway_services = {}
655                 self._keep_services = None
656                 self.using_proxy = None
657                 self._static_services_list = False
658
659     def current_timeout(self, attempt_number):
660         """Return the appropriate timeout to use for this client.
661
662         The proxy timeout setting if the backend service is currently a proxy,
663         the regular timeout setting otherwise.  The `attempt_number` indicates
664         how many times the operation has been tried already (starting from 0
665         for the first try), and scales the connection timeout portion of the
666         return value accordingly.
667
668         """
669         # TODO(twp): the timeout should be a property of a
670         # KeepService, not a KeepClient. See #4488.
671         t = self.proxy_timeout if self.using_proxy else self.timeout
672         return (t[0] * (1 << attempt_number), t[1])
673
674     def build_services_list(self, force_rebuild=False):
675         if (self._static_services_list or
676               (self._keep_services and not force_rebuild)):
677             return
678         with self.lock:
679             try:
680                 keep_services = self.api_client.keep_services().accessible()
681             except Exception:  # API server predates Keep services.
682                 keep_services = self.api_client.keep_disks().list()
683
684             accessible = keep_services.execute().get('items')
685             if not accessible:
686                 raise arvados.errors.NoKeepServersError()
687
688             # Precompute the base URI for each service.
689             for r in accessible:
690                 host = r['service_host']
691                 if not host.startswith('[') and host.find(':') >= 0:
692                     # IPv6 URIs must be formatted like http://[::1]:80/...
693                     host = '[' + host + ']'
694                 r['_service_root'] = "{}://{}:{:d}/".format(
695                     'https' if r['service_ssl_flag'] else 'http',
696                     host,
697                     r['service_port'])
698
699             # Gateway services are only used when specified by UUID,
700             # so there's nothing to gain by filtering them by
701             # service_type.
702             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
703             _logger.debug(str(self._gateway_services))
704
705             self._keep_services = [
706                 ks for ks in accessible
707                 if ks.get('service_type') in ['disk', 'proxy']]
708             _logger.debug(str(self._keep_services))
709
710             self.using_proxy = any(ks.get('service_type') == 'proxy'
711                                    for ks in self._keep_services)
712
713     def _service_weight(self, data_hash, service_uuid):
714         """Compute the weight of a Keep service endpoint for a data
715         block with a known hash.
716
717         The weight is md5(h + u) where u is the last 15 characters of
718         the service endpoint's UUID.
719         """
720         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
721
722     def weighted_service_roots(self, locator, force_rebuild=False):
723         """Return an array of Keep service endpoints, in the order in
724         which they should be probed when reading or writing data with
725         the given hash+hints.
726         """
727         self.build_services_list(force_rebuild)
728
729         sorted_roots = []
730
731         # Use the services indicated by the given +K@... remote
732         # service hints, if any are present and can be resolved to a
733         # URI.
734         for hint in locator.hints:
735             if hint.startswith('K@'):
736                 if len(hint) == 7:
737                     sorted_roots.append(
738                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
739                 elif len(hint) == 29:
740                     svc = self._gateway_services.get(hint[2:])
741                     if svc:
742                         sorted_roots.append(svc['_service_root'])
743
744         # Sort the available local services by weight (heaviest first)
745         # for this locator, and return their service_roots (base URIs)
746         # in that order.
747         sorted_roots.extend([
748             svc['_service_root'] for svc in sorted(
749                 self._keep_services,
750                 reverse=True,
751                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
752         _logger.debug("{}: {}".format(locator, sorted_roots))
753         return sorted_roots
754
755     def map_new_services(self, roots_map, locator, force_rebuild, **headers):
756         # roots_map is a dictionary, mapping Keep service root strings
757         # to KeepService objects.  Poll for Keep services, and add any
758         # new ones to roots_map.  Return the current list of local
759         # root strings.
760         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
761         local_roots = self.weighted_service_roots(locator, force_rebuild)
762         for root in local_roots:
763             if root not in roots_map:
764                 roots_map[root] = self.KeepService(
765                     root, self._user_agent_pool, **headers)
766         return local_roots
767
768     @staticmethod
769     def _check_loop_result(result):
770         # KeepClient RetryLoops should save results as a 2-tuple: the
771         # actual result of the request, and the number of servers available
772         # to receive the request this round.
773         # This method returns True if there's a real result, False if
774         # there are no more servers available, otherwise None.
775         if isinstance(result, Exception):
776             return None
777         result, tried_server_count = result
778         if (result is not None) and (result is not False):
779             return True
780         elif tried_server_count < 1:
781             _logger.info("No more Keep services to try; giving up")
782             return False
783         else:
784             return None
785
786     def get_from_cache(self, loc):
787         """Fetch a block only if is in the cache, otherwise return None."""
788         slot = self.block_cache.get(loc)
789         if slot is not None and slot.ready.is_set():
790             return slot.get()
791         else:
792             return None
793
794     @retry.retry_method
795     def get(self, loc_s, num_retries=None):
796         """Get data from Keep.
797
798         This method fetches one or more blocks of data from Keep.  It
799         sends a request each Keep service registered with the API
800         server (or the proxy provided when this client was
801         instantiated), then each service named in location hints, in
802         sequence.  As soon as one service provides the data, it's
803         returned.
804
805         Arguments:
806         * loc_s: A string of one or more comma-separated locators to fetch.
807           This method returns the concatenation of these blocks.
808         * num_retries: The number of times to retry GET requests to
809           *each* Keep server if it returns temporary failures, with
810           exponential backoff.  Note that, in each loop, the method may try
811           to fetch data from every available Keep service, along with any
812           that are named in location hints in the locator.  The default value
813           is set when the KeepClient is initialized.
814         """
815         if ',' in loc_s:
816             return ''.join(self.get(x) for x in loc_s.split(','))
817         locator = KeepLocator(loc_s)
818         slot, first = self.block_cache.reserve_cache(locator.md5sum)
819         if not first:
820             v = slot.get()
821             return v
822
823         # If the locator has hints specifying a prefix (indicating a
824         # remote keepproxy) or the UUID of a local gateway service,
825         # read data from the indicated service(s) instead of the usual
826         # list of local disk services.
827         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
828                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
829         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
830                            for hint in locator.hints if (
831                                    hint.startswith('K@') and
832                                    len(hint) == 29 and
833                                    self._gateway_services.get(hint[2:])
834                                    )])
835         # Map root URLs to their KeepService objects.
836         roots_map = {
837             root: self.KeepService(root, self._user_agent_pool)
838             for root in hint_roots
839         }
840
841         # See #3147 for a discussion of the loop implementation.  Highlights:
842         # * Refresh the list of Keep services after each failure, in case
843         #   it's being updated.
844         # * Retry until we succeed, we're out of retries, or every available
845         #   service has returned permanent failure.
846         sorted_roots = []
847         roots_map = {}
848         blob = None
849         loop = retry.RetryLoop(num_retries, self._check_loop_result,
850                                backoff_start=2)
851         for tries_left in loop:
852             try:
853                 sorted_roots = self.map_new_services(
854                     roots_map, locator,
855                     force_rebuild=(tries_left < num_retries))
856             except Exception as error:
857                 loop.save_result(error)
858                 continue
859
860             # Query KeepService objects that haven't returned
861             # permanent failure, in our specified shuffle order.
862             services_to_try = [roots_map[root]
863                                for root in sorted_roots
864                                if roots_map[root].usable()]
865             for keep_service in services_to_try:
866                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
867                 if blob is not None:
868                     break
869             loop.save_result((blob, len(services_to_try)))
870
871         # Always cache the result, then return it if we succeeded.
872         slot.set(blob)
873         self.block_cache.cap_cache()
874         if loop.success():
875             return blob
876
877         # Q: Including 403 is necessary for the Keep tests to continue
878         # passing, but maybe they should expect KeepReadError instead?
879         not_founds = sum(1 for key in sorted_roots
880                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
881         service_errors = ((key, roots_map[key].last_result()['error'])
882                           for key in sorted_roots)
883         if not roots_map:
884             raise arvados.errors.KeepReadError(
885                 "failed to read {}: no Keep services available ({})".format(
886                     loc_s, loop.last_result()))
887         elif not_founds == len(sorted_roots):
888             raise arvados.errors.NotFoundError(
889                 "{} not found".format(loc_s), service_errors)
890         else:
891             raise arvados.errors.KeepReadError(
892                 "failed to read {}".format(loc_s), service_errors, label="service")
893
894     @retry.retry_method
895     def put(self, data, copies=2, num_retries=None):
896         """Save data in Keep.
897
898         This method will get a list of Keep services from the API server, and
899         send the data to each one simultaneously in a new thread.  Once the
900         uploads are finished, if enough copies are saved, this method returns
901         the most recent HTTP response body.  If requests fail to upload
902         enough copies, this method raises KeepWriteError.
903
904         Arguments:
905         * data: The string of data to upload.
906         * copies: The number of copies that the user requires be saved.
907           Default 2.
908         * num_retries: The number of times to retry PUT requests to
909           *each* Keep server if it returns temporary failures, with
910           exponential backoff.  The default value is set when the
911           KeepClient is initialized.
912         """
913
914         if isinstance(data, unicode):
915             data = data.encode("ascii")
916         elif not isinstance(data, str):
917             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
918
919         data_hash = hashlib.md5(data).hexdigest()
920         if copies < 1:
921             return data_hash
922         locator = KeepLocator(data_hash + '+' + str(len(data)))
923
924         headers = {}
925         if self.using_proxy:
926             # Tell the proxy how many copies we want it to store
927             headers['X-Keep-Desired-Replication'] = str(copies)
928         roots_map = {}
929         thread_limiter = KeepClient.ThreadLimiter(copies)
930         loop = retry.RetryLoop(num_retries, self._check_loop_result,
931                                backoff_start=2)
932         for tries_left in loop:
933             try:
934                 local_roots = self.map_new_services(
935                     roots_map, locator,
936                     force_rebuild=(tries_left < num_retries), **headers)
937             except Exception as error:
938                 loop.save_result(error)
939                 continue
940
941             threads = []
942             for service_root, ks in roots_map.iteritems():
943                 if ks.finished():
944                     continue
945                 t = KeepClient.KeepWriterThread(
946                     ks,
947                     data=data,
948                     data_hash=data_hash,
949                     service_root=service_root,
950                     thread_limiter=thread_limiter,
951                     timeout=self.current_timeout(num_retries-tries_left))
952                 t.start()
953                 threads.append(t)
954             for t in threads:
955                 t.join()
956             loop.save_result((thread_limiter.done() >= copies, len(threads)))
957
958         if loop.success():
959             return thread_limiter.response()
960         if not roots_map:
961             raise arvados.errors.KeepWriteError(
962                 "failed to write {}: no Keep services available ({})".format(
963                     data_hash, loop.last_result()))
964         else:
965             service_errors = ((key, roots_map[key].last_result()['error'])
966                               for key in local_roots
967                               if roots_map[key].last_result()['error'])
968             raise arvados.errors.KeepWriteError(
969                 "failed to write {} (wanted {} copies but wrote {})".format(
970                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
971
972     def local_store_put(self, data, copies=1, num_retries=None):
973         """A stub for put().
974
975         This method is used in place of the real put() method when
976         using local storage (see constructor's local_store argument).
977
978         copies and num_retries arguments are ignored: they are here
979         only for the sake of offering the same call signature as
980         put().
981
982         Data stored this way can be retrieved via local_store_get().
983         """
984         md5 = hashlib.md5(data).hexdigest()
985         locator = '%s+%d' % (md5, len(data))
986         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
987             f.write(data)
988         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
989                   os.path.join(self.local_store, md5))
990         return locator
991
992     def local_store_get(self, loc_s, num_retries=None):
993         """Companion to local_store_put()."""
994         try:
995             locator = KeepLocator(loc_s)
996         except ValueError:
997             raise arvados.errors.NotFoundError(
998                 "Invalid data locator: '%s'" % loc_s)
999         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1000             return ''
1001         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1002             return f.read()
1003
1004     def is_cached(self, locator):
1005         return self.block_cache.reserve_cache(expect_hash)