Merge branch '3145-close-early' closes #3145
[arvados.git] / sdk / python / arvados / keep.py
1 import bz2
2 import datetime
3 import fcntl
4 import functools
5 import gflags
6 import hashlib
7 import json
8 import logging
9 import os
10 import pprint
11 import pycurl
12 import Queue
13 import re
14 import socket
15 import ssl
16 import string
17 import StringIO
18 import subprocess
19 import sys
20 import threading
21 import time
22 import timer
23 import types
24 import UserDict
25 import zlib
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.retry as retry
31 import arvados.util
32
33 _logger = logging.getLogger('arvados.keep')
34 global_client_object = None
35
36
37 class KeepLocator(object):
38     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
39     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
40
41     def __init__(self, locator_str):
42         self.hints = []
43         self._perm_sig = None
44         self._perm_expiry = None
45         pieces = iter(locator_str.split('+'))
46         self.md5sum = next(pieces)
47         try:
48             self.size = int(next(pieces))
49         except StopIteration:
50             self.size = None
51         for hint in pieces:
52             if self.HINT_RE.match(hint) is None:
53                 raise ValueError("invalid hint format: {}".format(hint))
54             elif hint.startswith('A'):
55                 self.parse_permission_hint(hint)
56             else:
57                 self.hints.append(hint)
58
59     def __str__(self):
60         return '+'.join(
61             str(s) for s in [self.md5sum, self.size,
62                              self.permission_hint()] + self.hints
63             if s is not None)
64
65     def stripped(self):
66         if self.size is not None:
67             return "%s+%i" % (self.md5sum, self.size)
68         else:
69             return self.md5sum
70
71     def _make_hex_prop(name, length):
72         # Build and return a new property with the given name that
73         # must be a hex string of the given length.
74         data_name = '_{}'.format(name)
75         def getter(self):
76             return getattr(self, data_name)
77         def setter(self, hex_str):
78             if not arvados.util.is_hex(hex_str, length):
79                 raise ValueError("{} must be a {}-digit hex string: {}".
80                                  format(name, length, hex_str))
81             setattr(self, data_name, hex_str)
82         return property(getter, setter)
83
84     md5sum = _make_hex_prop('md5sum', 32)
85     perm_sig = _make_hex_prop('perm_sig', 40)
86
87     @property
88     def perm_expiry(self):
89         return self._perm_expiry
90
91     @perm_expiry.setter
92     def perm_expiry(self, value):
93         if not arvados.util.is_hex(value, 1, 8):
94             raise ValueError(
95                 "permission timestamp must be a hex Unix timestamp: {}".
96                 format(value))
97         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
98
99     def permission_hint(self):
100         data = [self.perm_sig, self.perm_expiry]
101         if None in data:
102             return None
103         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
104         return "A{}@{:08x}".format(*data)
105
106     def parse_permission_hint(self, s):
107         try:
108             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
109         except IndexError:
110             raise ValueError("bad permission hint {}".format(s))
111
112     def permission_expired(self, as_of_dt=None):
113         if self.perm_expiry is None:
114             return False
115         elif as_of_dt is None:
116             as_of_dt = datetime.datetime.now()
117         return self.perm_expiry <= as_of_dt
118
119
120 class Keep(object):
121     """Simple interface to a global KeepClient object.
122
123     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
124     own API client.  The global KeepClient will build an API client from the
125     current Arvados configuration, which may not match the one you built.
126     """
127     _last_key = None
128
129     @classmethod
130     def global_client_object(cls):
131         global global_client_object
132         # Previously, KeepClient would change its behavior at runtime based
133         # on these configuration settings.  We simulate that behavior here
134         # by checking the values and returning a new KeepClient if any of
135         # them have changed.
136         key = (config.get('ARVADOS_API_HOST'),
137                config.get('ARVADOS_API_TOKEN'),
138                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
139                config.get('ARVADOS_KEEP_PROXY'),
140                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
141                os.environ.get('KEEP_LOCAL_STORE'))
142         if (global_client_object is None) or (cls._last_key != key):
143             global_client_object = KeepClient()
144             cls._last_key = key
145         return global_client_object
146
147     @staticmethod
148     def get(locator, **kwargs):
149         return Keep.global_client_object().get(locator, **kwargs)
150
151     @staticmethod
152     def put(data, **kwargs):
153         return Keep.global_client_object().put(data, **kwargs)
154
155 class KeepBlockCache(object):
156     # Default RAM cache is 256MiB
157     def __init__(self, cache_max=(256 * 1024 * 1024)):
158         self.cache_max = cache_max
159         self._cache = []
160         self._cache_lock = threading.Lock()
161
162     class CacheSlot(object):
163         def __init__(self, locator):
164             self.locator = locator
165             self.ready = threading.Event()
166             self.content = None
167
168         def get(self):
169             self.ready.wait()
170             return self.content
171
172         def set(self, value):
173             self.content = value
174             self.ready.set()
175
176         def size(self):
177             if self.content is None:
178                 return 0
179             else:
180                 return len(self.content)
181
182     def cap_cache(self):
183         '''Cap the cache size to self.cache_max'''
184         with self._cache_lock:
185             # Select all slots except those where ready.is_set() and content is
186             # None (that means there was an error reading the block).
187             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
188             sm = sum([slot.size() for slot in self._cache])
189             while len(self._cache) > 0 and sm > self.cache_max:
190                 for i in xrange(len(self._cache)-1, -1, -1):
191                     if self._cache[i].ready.is_set():
192                         del self._cache[i]
193                         break
194                 sm = sum([slot.size() for slot in self._cache])
195
196     def _get(self, locator):
197         # Test if the locator is already in the cache
198         for i in xrange(0, len(self._cache)):
199             if self._cache[i].locator == locator:
200                 n = self._cache[i]
201                 if i != 0:
202                     # move it to the front
203                     del self._cache[i]
204                     self._cache.insert(0, n)
205                 return n
206         return None
207
208     def get(self, locator):
209         with self._cache_lock:
210             return self._get(locator)
211
212     def reserve_cache(self, locator):
213         '''Reserve a cache slot for the specified locator,
214         or return the existing slot.'''
215         with self._cache_lock:
216             n = self._get(locator)
217             if n:
218                 return n, False
219             else:
220                 # Add a new cache slot for the locator
221                 n = KeepBlockCache.CacheSlot(locator)
222                 self._cache.insert(0, n)
223                 return n, True
224
225 class KeepClient(object):
226
227     # Default Keep server connection timeout:  2 seconds
228     # Default Keep server read timeout:      300 seconds
229     # Default Keep proxy connection timeout:  20 seconds
230     # Default Keep proxy read timeout:       300 seconds
231     DEFAULT_TIMEOUT = (2, 300)
232     DEFAULT_PROXY_TIMEOUT = (20, 300)
233
234     class ThreadLimiter(object):
235         """
236         Limit the number of threads running at a given time to
237         {desired successes} minus {successes reported}. When successes
238         reported == desired, wake up the remaining threads and tell
239         them to quit.
240
241         Should be used in a "with" block.
242         """
243         def __init__(self, todo):
244             self._todo = todo
245             self._done = 0
246             self._response = None
247             self._todo_lock = threading.Semaphore(todo)
248             self._done_lock = threading.Lock()
249
250         def __enter__(self):
251             self._todo_lock.acquire()
252             return self
253
254         def __exit__(self, type, value, traceback):
255             self._todo_lock.release()
256
257         def shall_i_proceed(self):
258             """
259             Return true if the current thread should do stuff. Return
260             false if the current thread should just stop.
261             """
262             with self._done_lock:
263                 return (self._done < self._todo)
264
265         def save_response(self, response_body, replicas_stored):
266             """
267             Records a response body (a locator, possibly signed) returned by
268             the Keep server.  It is not necessary to save more than
269             one response, since we presume that any locator returned
270             in response to a successful request is valid.
271             """
272             with self._done_lock:
273                 self._done += replicas_stored
274                 self._response = response_body
275
276         def response(self):
277             """
278             Returns the body from the response to a PUT request.
279             """
280             with self._done_lock:
281                 return self._response
282
283         def done(self):
284             """
285             Return how many successes were reported.
286             """
287             with self._done_lock:
288                 return self._done
289
290
291     class KeepService(object):
292         """Make requests to a single Keep service, and track results.
293
294         A KeepService is intended to last long enough to perform one
295         transaction (GET or PUT) against one Keep service. This can
296         involve calling either get() or put() multiple times in order
297         to retry after transient failures. However, calling both get()
298         and put() on a single instance -- or using the same instance
299         to access two different Keep services -- will not produce
300         sensible behavior.
301         """
302
303         HTTP_ERRORS = (
304             socket.error,
305             ssl.SSLError,
306             arvados.errors.HttpError,
307         )
308
309         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
310             self.root = root
311             self._user_agent_pool = user_agent_pool
312             self._result = {'error': None}
313             self._usable = True
314             self._session = None
315             self.get_headers = {'Accept': 'application/octet-stream'}
316             self.get_headers.update(headers)
317             self.put_headers = headers
318
319         def usable(self):
320             """Is it worth attempting a request?"""
321             return self._usable
322
323         def finished(self):
324             """Did the request succeed or encounter permanent failure?"""
325             return self._result['error'] == False or not self._usable
326
327         def last_result(self):
328             return self._result
329
330         def _get_user_agent(self):
331             try:
332                 return self._user_agent_pool.get(False)
333             except Queue.Empty:
334                 return pycurl.Curl()
335
336         def _put_user_agent(self, ua):
337             try:
338                 ua.reset()
339                 self._user_agent_pool.put(ua, False)
340             except:
341                 ua.close()
342
343         def _socket_open(self, family, socktype, protocol, address):
344             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
345             s = socket.socket(family, socktype, protocol)
346             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
347             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
348             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
349             return s
350
351         def get(self, locator, timeout=None):
352             # locator is a KeepLocator object.
353             url = self.root + str(locator)
354             _logger.debug("Request: GET %s", url)
355             curl = self._get_user_agent()
356             try:
357                 with timer.Timer() as t:
358                     self._headers = {}
359                     response_body = StringIO.StringIO()
360                     curl.setopt(pycurl.NOSIGNAL, 1)
361                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
362                     curl.setopt(pycurl.URL, url.encode('utf-8'))
363                     curl.setopt(pycurl.HTTPHEADER, [
364                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
365                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
366                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
367                     self._setcurltimeouts(curl, timeout)
368                     try:
369                         curl.perform()
370                     except Exception as e:
371                         raise arvados.errors.HttpError(0, str(e))
372                     self._result = {
373                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
374                         'body': response_body.getvalue(),
375                         'headers': self._headers,
376                         'error': False,
377                     }
378                 ok = retry.check_http_response_success(self._result['status_code'])
379                 if not ok:
380                     self._result['error'] = arvados.errors.HttpError(
381                         self._result['status_code'],
382                         self._headers.get('x-status-line', 'Error'))
383             except self.HTTP_ERRORS as e:
384                 self._result = {
385                     'error': e,
386                 }
387                 ok = False
388             self._usable = ok != False
389             if self._result.get('status_code', None):
390                 # The client worked well enough to get an HTTP status
391                 # code, so presumably any problems are just on the
392                 # server side and it's OK to reuse the client.
393                 self._put_user_agent(curl)
394             else:
395                 # Don't return this client to the pool, in case it's
396                 # broken.
397                 curl.close()
398             if not ok:
399                 _logger.debug("Request fail: GET %s => %s: %s",
400                               url, type(self._result['error']), str(self._result['error']))
401                 return None
402             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
403                          self._result['status_code'],
404                          len(self._result['body']),
405                          t.msecs,
406                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
407             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
408             if resp_md5 != locator.md5sum:
409                 _logger.warning("Checksum fail: md5(%s) = %s",
410                                 url, resp_md5)
411                 self._result['error'] = arvados.errors.HttpError(
412                     0, 'Checksum fail')
413                 return None
414             return self._result['body']
415
416         def put(self, hash_s, body, timeout=None):
417             url = self.root + hash_s
418             _logger.debug("Request: PUT %s", url)
419             curl = self._get_user_agent()
420             try:
421                 self._headers = {}
422                 response_body = StringIO.StringIO()
423                 curl.setopt(pycurl.NOSIGNAL, 1)
424                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
425                 curl.setopt(pycurl.URL, url.encode('utf-8'))
426                 curl.setopt(pycurl.POSTFIELDS, body)
427                 curl.setopt(pycurl.CUSTOMREQUEST, 'PUT')
428                 curl.setopt(pycurl.HTTPHEADER, [
429                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
430                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
431                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
432                 self._setcurltimeouts(curl, timeout)
433                 try:
434                     curl.perform()
435                 except Exception as e:
436                     raise arvados.errors.HttpError(0, str(e))
437                 self._result = {
438                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
439                     'body': response_body.getvalue(),
440                     'headers': self._headers,
441                     'error': False,
442                 }
443                 ok = retry.check_http_response_success(self._result['status_code'])
444                 if not ok:
445                     self._result['error'] = arvados.errors.HttpError(
446                         self._result['status_code'],
447                         self._headers.get('x-status-line', 'Error'))
448             except self.HTTP_ERRORS as e:
449                 self._result = {
450                     'error': e,
451                 }
452                 ok = False
453             self._usable = ok != False # still usable if ok is True or None
454             if self._result.get('status_code', None):
455                 # Client is functional. See comment in get().
456                 self._put_user_agent(curl)
457             else:
458                 curl.close()
459             if not ok:
460                 _logger.debug("Request fail: PUT %s => %s: %s",
461                               url, type(self._result['error']), str(self._result['error']))
462                 return False
463             return True
464
465         def _setcurltimeouts(self, curl, timeouts):
466             if not timeouts:
467                 return
468             elif isinstance(timeouts, tuple):
469                 conn_t, xfer_t = timeouts
470             else:
471                 conn_t, xfer_t = (timeouts, timeouts)
472             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
473             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
474
475         def _headerfunction(self, header_line):
476             header_line = header_line.decode('iso-8859-1')
477             if ':' in header_line:
478                 name, value = header_line.split(':', 1)
479                 name = name.strip().lower()
480                 value = value.strip()
481             elif self._headers:
482                 name = self._lastheadername
483                 value = self._headers[name] + ' ' + header_line.strip()
484             elif header_line.startswith('HTTP/'):
485                 name = 'x-status-line'
486                 value = header_line
487             else:
488                 _logger.error("Unexpected header line: %s", header_line)
489                 return
490             self._lastheadername = name
491             self._headers[name] = value
492             # Returning None implies all bytes were written
493
494
495     class KeepWriterThread(threading.Thread):
496         """
497         Write a blob of data to the given Keep server. On success, call
498         save_response() of the given ThreadLimiter to save the returned
499         locator.
500         """
501         def __init__(self, keep_service, **kwargs):
502             super(KeepClient.KeepWriterThread, self).__init__()
503             self.service = keep_service
504             self.args = kwargs
505             self._success = False
506
507         def success(self):
508             return self._success
509
510         def run(self):
511             with self.args['thread_limiter'] as limiter:
512                 if not limiter.shall_i_proceed():
513                     # My turn arrived, but the job has been done without
514                     # me.
515                     return
516                 self.run_with_limiter(limiter)
517
518         def run_with_limiter(self, limiter):
519             if self.service.finished():
520                 return
521             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
522                           str(threading.current_thread()),
523                           self.args['data_hash'],
524                           len(self.args['data']),
525                           self.args['service_root'])
526             self._success = bool(self.service.put(
527                 self.args['data_hash'],
528                 self.args['data'],
529                 timeout=self.args.get('timeout', None)))
530             result = self.service.last_result()
531             if self._success:
532                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
533                               str(threading.current_thread()),
534                               self.args['data_hash'],
535                               len(self.args['data']),
536                               self.args['service_root'])
537                 # Tick the 'done' counter for the number of replica
538                 # reported stored by the server, for the case that
539                 # we're talking to a proxy or other backend that
540                 # stores to multiple copies for us.
541                 try:
542                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
543                 except (KeyError, ValueError):
544                     replicas_stored = 1
545                 limiter.save_response(result['body'].strip(), replicas_stored)
546             elif result.get('status_code', None):
547                 _logger.debug("Request fail: PUT %s => %s %s",
548                               self.args['data_hash'],
549                               result['status_code'],
550                               result['body'])
551
552
553     def __init__(self, api_client=None, proxy=None,
554                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
555                  api_token=None, local_store=None, block_cache=None,
556                  num_retries=0, session=None):
557         """Initialize a new KeepClient.
558
559         Arguments:
560         :api_client:
561           The API client to use to find Keep services.  If not
562           provided, KeepClient will build one from available Arvados
563           configuration.
564
565         :proxy:
566           If specified, this KeepClient will send requests to this Keep
567           proxy.  Otherwise, KeepClient will fall back to the setting of the
568           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
569           KeepClient does not use a proxy, pass in an empty string.
570
571         :timeout:
572           The initial timeout (in seconds) for HTTP requests to Keep
573           non-proxy servers.  A tuple of two floats is interpreted as
574           (connection_timeout, read_timeout): see
575           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
576           Because timeouts are often a result of transient server load, the
577           actual connection timeout will be increased by a factor of two on
578           each retry.
579           Default: (2, 300).
580
581         :proxy_timeout:
582           The initial timeout (in seconds) for HTTP requests to
583           Keep proxies. A tuple of two floats is interpreted as
584           (connection_timeout, read_timeout). The behavior described
585           above for adjusting connection timeouts on retry also applies.
586           Default: (20, 300).
587
588         :api_token:
589           If you're not using an API client, but only talking
590           directly to a Keep proxy, this parameter specifies an API token
591           to authenticate Keep requests.  It is an error to specify both
592           api_client and api_token.  If you specify neither, KeepClient
593           will use one available from the Arvados configuration.
594
595         :local_store:
596           If specified, this KeepClient will bypass Keep
597           services, and save data to the named directory.  If unspecified,
598           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
599           environment variable.  If you want to ensure KeepClient does not
600           use local storage, pass in an empty string.  This is primarily
601           intended to mock a server for testing.
602
603         :num_retries:
604           The default number of times to retry failed requests.
605           This will be used as the default num_retries value when get() and
606           put() are called.  Default 0.
607         """
608         self.lock = threading.Lock()
609         if proxy is None:
610             proxy = config.get('ARVADOS_KEEP_PROXY')
611         if api_token is None:
612             if api_client is None:
613                 api_token = config.get('ARVADOS_API_TOKEN')
614             else:
615                 api_token = api_client.api_token
616         elif api_client is not None:
617             raise ValueError(
618                 "can't build KeepClient with both API client and token")
619         if local_store is None:
620             local_store = os.environ.get('KEEP_LOCAL_STORE')
621
622         self.block_cache = block_cache if block_cache else KeepBlockCache()
623         self.timeout = timeout
624         self.proxy_timeout = proxy_timeout
625         self._user_agent_pool = Queue.LifoQueue()
626
627         if local_store:
628             self.local_store = local_store
629             self.get = self.local_store_get
630             self.put = self.local_store_put
631         else:
632             self.num_retries = num_retries
633             if proxy:
634                 if not proxy.endswith('/'):
635                     proxy += '/'
636                 self.api_token = api_token
637                 self._gateway_services = {}
638                 self._keep_services = [{
639                     'uuid': 'proxy',
640                     '_service_root': proxy,
641                     }]
642                 self.using_proxy = True
643                 self._static_services_list = True
644             else:
645                 # It's important to avoid instantiating an API client
646                 # unless we actually need one, for testing's sake.
647                 if api_client is None:
648                     api_client = arvados.api('v1')
649                 self.api_client = api_client
650                 self.api_token = api_client.api_token
651                 self._gateway_services = {}
652                 self._keep_services = None
653                 self.using_proxy = None
654                 self._static_services_list = False
655
656     def current_timeout(self, attempt_number):
657         """Return the appropriate timeout to use for this client.
658
659         The proxy timeout setting if the backend service is currently a proxy,
660         the regular timeout setting otherwise.  The `attempt_number` indicates
661         how many times the operation has been tried already (starting from 0
662         for the first try), and scales the connection timeout portion of the
663         return value accordingly.
664
665         """
666         # TODO(twp): the timeout should be a property of a
667         # KeepService, not a KeepClient. See #4488.
668         t = self.proxy_timeout if self.using_proxy else self.timeout
669         return (t[0] * (1 << attempt_number), t[1])
670
671     def build_services_list(self, force_rebuild=False):
672         if (self._static_services_list or
673               (self._keep_services and not force_rebuild)):
674             return
675         with self.lock:
676             try:
677                 keep_services = self.api_client.keep_services().accessible()
678             except Exception:  # API server predates Keep services.
679                 keep_services = self.api_client.keep_disks().list()
680
681             accessible = keep_services.execute().get('items')
682             if not accessible:
683                 raise arvados.errors.NoKeepServersError()
684
685             # Precompute the base URI for each service.
686             for r in accessible:
687                 host = r['service_host']
688                 if not host.startswith('[') and host.find(':') >= 0:
689                     # IPv6 URIs must be formatted like http://[::1]:80/...
690                     host = '[' + host + ']'
691                 r['_service_root'] = "{}://{}:{:d}/".format(
692                     'https' if r['service_ssl_flag'] else 'http',
693                     host,
694                     r['service_port'])
695
696             # Gateway services are only used when specified by UUID,
697             # so there's nothing to gain by filtering them by
698             # service_type.
699             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
700             _logger.debug(str(self._gateway_services))
701
702             self._keep_services = [
703                 ks for ks in accessible
704                 if ks.get('service_type') in ['disk', 'proxy']]
705             _logger.debug(str(self._keep_services))
706
707             self.using_proxy = any(ks.get('service_type') == 'proxy'
708                                    for ks in self._keep_services)
709
710     def _service_weight(self, data_hash, service_uuid):
711         """Compute the weight of a Keep service endpoint for a data
712         block with a known hash.
713
714         The weight is md5(h + u) where u is the last 15 characters of
715         the service endpoint's UUID.
716         """
717         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
718
719     def weighted_service_roots(self, locator, force_rebuild=False):
720         """Return an array of Keep service endpoints, in the order in
721         which they should be probed when reading or writing data with
722         the given hash+hints.
723         """
724         self.build_services_list(force_rebuild)
725
726         sorted_roots = []
727
728         # Use the services indicated by the given +K@... remote
729         # service hints, if any are present and can be resolved to a
730         # URI.
731         for hint in locator.hints:
732             if hint.startswith('K@'):
733                 if len(hint) == 7:
734                     sorted_roots.append(
735                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
736                 elif len(hint) == 29:
737                     svc = self._gateway_services.get(hint[2:])
738                     if svc:
739                         sorted_roots.append(svc['_service_root'])
740
741         # Sort the available local services by weight (heaviest first)
742         # for this locator, and return their service_roots (base URIs)
743         # in that order.
744         sorted_roots.extend([
745             svc['_service_root'] for svc in sorted(
746                 self._keep_services,
747                 reverse=True,
748                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
749         _logger.debug("{}: {}".format(locator, sorted_roots))
750         return sorted_roots
751
752     def map_new_services(self, roots_map, locator, force_rebuild, **headers):
753         # roots_map is a dictionary, mapping Keep service root strings
754         # to KeepService objects.  Poll for Keep services, and add any
755         # new ones to roots_map.  Return the current list of local
756         # root strings.
757         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
758         local_roots = self.weighted_service_roots(locator, force_rebuild)
759         for root in local_roots:
760             if root not in roots_map:
761                 roots_map[root] = self.KeepService(
762                     root, self._user_agent_pool, **headers)
763         return local_roots
764
765     @staticmethod
766     def _check_loop_result(result):
767         # KeepClient RetryLoops should save results as a 2-tuple: the
768         # actual result of the request, and the number of servers available
769         # to receive the request this round.
770         # This method returns True if there's a real result, False if
771         # there are no more servers available, otherwise None.
772         if isinstance(result, Exception):
773             return None
774         result, tried_server_count = result
775         if (result is not None) and (result is not False):
776             return True
777         elif tried_server_count < 1:
778             _logger.info("No more Keep services to try; giving up")
779             return False
780         else:
781             return None
782
783     def get_from_cache(self, loc):
784         """Fetch a block only if is in the cache, otherwise return None."""
785         slot = self.block_cache.get(loc)
786         if slot is not None and slot.ready.is_set():
787             return slot.get()
788         else:
789             return None
790
791     @retry.retry_method
792     def get(self, loc_s, num_retries=None):
793         """Get data from Keep.
794
795         This method fetches one or more blocks of data from Keep.  It
796         sends a request each Keep service registered with the API
797         server (or the proxy provided when this client was
798         instantiated), then each service named in location hints, in
799         sequence.  As soon as one service provides the data, it's
800         returned.
801
802         Arguments:
803         * loc_s: A string of one or more comma-separated locators to fetch.
804           This method returns the concatenation of these blocks.
805         * num_retries: The number of times to retry GET requests to
806           *each* Keep server if it returns temporary failures, with
807           exponential backoff.  Note that, in each loop, the method may try
808           to fetch data from every available Keep service, along with any
809           that are named in location hints in the locator.  The default value
810           is set when the KeepClient is initialized.
811         """
812         if ',' in loc_s:
813             return ''.join(self.get(x) for x in loc_s.split(','))
814         locator = KeepLocator(loc_s)
815         slot, first = self.block_cache.reserve_cache(locator.md5sum)
816         if not first:
817             v = slot.get()
818             return v
819
820         # If the locator has hints specifying a prefix (indicating a
821         # remote keepproxy) or the UUID of a local gateway service,
822         # read data from the indicated service(s) instead of the usual
823         # list of local disk services.
824         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
825                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
826         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
827                            for hint in locator.hints if (
828                                    hint.startswith('K@') and
829                                    len(hint) == 29 and
830                                    self._gateway_services.get(hint[2:])
831                                    )])
832         # Map root URLs to their KeepService objects.
833         roots_map = {
834             root: self.KeepService(root, self._user_agent_pool)
835             for root in hint_roots
836         }
837
838         # See #3147 for a discussion of the loop implementation.  Highlights:
839         # * Refresh the list of Keep services after each failure, in case
840         #   it's being updated.
841         # * Retry until we succeed, we're out of retries, or every available
842         #   service has returned permanent failure.
843         sorted_roots = []
844         roots_map = {}
845         blob = None
846         loop = retry.RetryLoop(num_retries, self._check_loop_result,
847                                backoff_start=2)
848         for tries_left in loop:
849             try:
850                 sorted_roots = self.map_new_services(
851                     roots_map, locator,
852                     force_rebuild=(tries_left < num_retries))
853             except Exception as error:
854                 loop.save_result(error)
855                 continue
856
857             # Query KeepService objects that haven't returned
858             # permanent failure, in our specified shuffle order.
859             services_to_try = [roots_map[root]
860                                for root in sorted_roots
861                                if roots_map[root].usable()]
862             for keep_service in services_to_try:
863                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
864                 if blob is not None:
865                     break
866             loop.save_result((blob, len(services_to_try)))
867
868         # Always cache the result, then return it if we succeeded.
869         slot.set(blob)
870         self.block_cache.cap_cache()
871         if loop.success():
872             return blob
873
874         # Q: Including 403 is necessary for the Keep tests to continue
875         # passing, but maybe they should expect KeepReadError instead?
876         not_founds = sum(1 for key in sorted_roots
877                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
878         service_errors = ((key, roots_map[key].last_result()['error'])
879                           for key in sorted_roots)
880         if not roots_map:
881             raise arvados.errors.KeepReadError(
882                 "failed to read {}: no Keep services available ({})".format(
883                     loc_s, loop.last_result()))
884         elif not_founds == len(sorted_roots):
885             raise arvados.errors.NotFoundError(
886                 "{} not found".format(loc_s), service_errors)
887         else:
888             raise arvados.errors.KeepReadError(
889                 "failed to read {}".format(loc_s), service_errors, label="service")
890
891     @retry.retry_method
892     def put(self, data, copies=2, num_retries=None):
893         """Save data in Keep.
894
895         This method will get a list of Keep services from the API server, and
896         send the data to each one simultaneously in a new thread.  Once the
897         uploads are finished, if enough copies are saved, this method returns
898         the most recent HTTP response body.  If requests fail to upload
899         enough copies, this method raises KeepWriteError.
900
901         Arguments:
902         * data: The string of data to upload.
903         * copies: The number of copies that the user requires be saved.
904           Default 2.
905         * num_retries: The number of times to retry PUT requests to
906           *each* Keep server if it returns temporary failures, with
907           exponential backoff.  The default value is set when the
908           KeepClient is initialized.
909         """
910
911         if isinstance(data, unicode):
912             data = data.encode("ascii")
913         elif not isinstance(data, str):
914             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put must be type 'str'")
915
916         data_hash = hashlib.md5(data).hexdigest()
917         if copies < 1:
918             return data_hash
919         locator = KeepLocator(data_hash + '+' + str(len(data)))
920
921         headers = {}
922         if self.using_proxy:
923             # Tell the proxy how many copies we want it to store
924             headers['X-Keep-Desired-Replication'] = str(copies)
925         roots_map = {}
926         thread_limiter = KeepClient.ThreadLimiter(copies)
927         loop = retry.RetryLoop(num_retries, self._check_loop_result,
928                                backoff_start=2)
929         for tries_left in loop:
930             try:
931                 local_roots = self.map_new_services(
932                     roots_map, locator,
933                     force_rebuild=(tries_left < num_retries), **headers)
934             except Exception as error:
935                 loop.save_result(error)
936                 continue
937
938             threads = []
939             for service_root, ks in roots_map.iteritems():
940                 if ks.finished():
941                     continue
942                 t = KeepClient.KeepWriterThread(
943                     ks,
944                     data=data,
945                     data_hash=data_hash,
946                     service_root=service_root,
947                     thread_limiter=thread_limiter,
948                     timeout=self.current_timeout(num_retries-tries_left))
949                 t.start()
950                 threads.append(t)
951             for t in threads:
952                 t.join()
953             loop.save_result((thread_limiter.done() >= copies, len(threads)))
954
955         if loop.success():
956             return thread_limiter.response()
957         if not roots_map:
958             raise arvados.errors.KeepWriteError(
959                 "failed to write {}: no Keep services available ({})".format(
960                     data_hash, loop.last_result()))
961         else:
962             service_errors = ((key, roots_map[key].last_result()['error'])
963                               for key in local_roots
964                               if roots_map[key].last_result()['error'])
965             raise arvados.errors.KeepWriteError(
966                 "failed to write {} (wanted {} copies but wrote {})".format(
967                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
968
969     def local_store_put(self, data, copies=1, num_retries=None):
970         """A stub for put().
971
972         This method is used in place of the real put() method when
973         using local storage (see constructor's local_store argument).
974
975         copies and num_retries arguments are ignored: they are here
976         only for the sake of offering the same call signature as
977         put().
978
979         Data stored this way can be retrieved via local_store_get().
980         """
981         md5 = hashlib.md5(data).hexdigest()
982         locator = '%s+%d' % (md5, len(data))
983         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
984             f.write(data)
985         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
986                   os.path.join(self.local_store, md5))
987         return locator
988
989     def local_store_get(self, loc_s, num_retries=None):
990         """Companion to local_store_put()."""
991         try:
992             locator = KeepLocator(loc_s)
993         except ValueError:
994             raise arvados.errors.NotFoundError(
995                 "Invalid data locator: '%s'" % loc_s)
996         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
997             return ''
998         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
999             return f.read()
1000
1001     def is_cached(self, locator):
1002         return self.block_cache.reserve_cache(expect_hash)