2853: Use rendezvous hashing to select probe order in Python library.
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import timer
21 import datetime
22 import ssl
23 import socket
24
25 import arvados
26 import arvados.config as config
27 import arvados.errors
28 import arvados.retry as retry
29 import arvados.util
30
31 _logger = logging.getLogger('arvados.keep')
32 global_client_object = None
33
34 class KeepLocator(object):
35     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
36     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37
38     def __init__(self, locator_str):
39         self.hints = []
40         self._perm_sig = None
41         self._perm_expiry = None
42         pieces = iter(locator_str.split('+'))
43         self.md5sum = next(pieces)
44         try:
45             self.size = int(next(pieces))
46         except StopIteration:
47             self.size = None
48         for hint in pieces:
49             if self.HINT_RE.match(hint) is None:
50                 raise ValueError("unrecognized hint data {}".format(hint))
51             elif hint.startswith('A'):
52                 self.parse_permission_hint(hint)
53             else:
54                 self.hints.append(hint)
55
56     def __str__(self):
57         return '+'.join(
58             str(s) for s in [self.md5sum, self.size,
59                              self.permission_hint()] + self.hints
60             if s is not None)
61
62     def _make_hex_prop(name, length):
63         # Build and return a new property with the given name that
64         # must be a hex string of the given length.
65         data_name = '_{}'.format(name)
66         def getter(self):
67             return getattr(self, data_name)
68         def setter(self, hex_str):
69             if not arvados.util.is_hex(hex_str, length):
70                 raise ValueError("{} must be a {}-digit hex string: {}".
71                                  format(name, length, hex_str))
72             setattr(self, data_name, hex_str)
73         return property(getter, setter)
74
75     md5sum = _make_hex_prop('md5sum', 32)
76     perm_sig = _make_hex_prop('perm_sig', 40)
77
78     @property
79     def perm_expiry(self):
80         return self._perm_expiry
81
82     @perm_expiry.setter
83     def perm_expiry(self, value):
84         if not arvados.util.is_hex(value, 1, 8):
85             raise ValueError(
86                 "permission timestamp must be a hex Unix timestamp: {}".
87                 format(value))
88         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89
90     def permission_hint(self):
91         data = [self.perm_sig, self.perm_expiry]
92         if None in data:
93             return None
94         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
95         return "A{}@{:08x}".format(*data)
96
97     def parse_permission_hint(self, s):
98         try:
99             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100         except IndexError:
101             raise ValueError("bad permission hint {}".format(s))
102
103     def permission_expired(self, as_of_dt=None):
104         if self.perm_expiry is None:
105             return False
106         elif as_of_dt is None:
107             as_of_dt = datetime.datetime.now()
108         return self.perm_expiry <= as_of_dt
109
110
111 class Keep(object):
112     """Simple interface to a global KeepClient object.
113
114     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
115     own API client.  The global KeepClient will build an API client from the
116     current Arvados configuration, which may not match the one you built.
117     """
118     _last_key = None
119
120     @classmethod
121     def global_client_object(cls):
122         global global_client_object
123         # Previously, KeepClient would change its behavior at runtime based
124         # on these configuration settings.  We simulate that behavior here
125         # by checking the values and returning a new KeepClient if any of
126         # them have changed.
127         key = (config.get('ARVADOS_API_HOST'),
128                config.get('ARVADOS_API_TOKEN'),
129                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
130                config.get('ARVADOS_KEEP_PROXY'),
131                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
132                os.environ.get('KEEP_LOCAL_STORE'))
133         if (global_client_object is None) or (cls._last_key != key):
134             global_client_object = KeepClient()
135             cls._last_key = key
136         return global_client_object
137
138     @staticmethod
139     def get(locator, **kwargs):
140         return Keep.global_client_object().get(locator, **kwargs)
141
142     @staticmethod
143     def put(data, **kwargs):
144         return Keep.global_client_object().put(data, **kwargs)
145
146 class KeepBlockCache(object):
147     # Default RAM cache is 256MiB
148     def __init__(self, cache_max=(256 * 1024 * 1024)):
149         self.cache_max = cache_max
150         self._cache = []
151         self._cache_lock = threading.Lock()
152
153     class CacheSlot(object):
154         def __init__(self, locator):
155             self.locator = locator
156             self.ready = threading.Event()
157             self.content = None
158
159         def get(self):
160             self.ready.wait()
161             return self.content
162
163         def set(self, value):
164             self.content = value
165             self.ready.set()
166
167         def size(self):
168             if self.content is None:
169                 return 0
170             else:
171                 return len(self.content)
172
173     def cap_cache(self):
174         '''Cap the cache size to self.cache_max'''
175         self._cache_lock.acquire()
176         try:
177             # Select all slots except those where ready.is_set() and content is
178             # None (that means there was an error reading the block).
179             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
180             sm = sum([slot.size() for slot in self._cache])
181             while len(self._cache) > 0 and sm > self.cache_max:
182                 for i in xrange(len(self._cache)-1, -1, -1):
183                     if self._cache[i].ready.is_set():
184                         del self._cache[i]
185                         break
186                 sm = sum([slot.size() for slot in self._cache])
187         finally:
188             self._cache_lock.release()
189
190     def reserve_cache(self, locator):
191         '''Reserve a cache slot for the specified locator,
192         or return the existing slot.'''
193         self._cache_lock.acquire()
194         try:
195             # Test if the locator is already in the cache
196             for i in xrange(0, len(self._cache)):
197                 if self._cache[i].locator == locator:
198                     n = self._cache[i]
199                     if i != 0:
200                         # move it to the front
201                         del self._cache[i]
202                         self._cache.insert(0, n)
203                     return n, False
204
205             # Add a new cache slot for the locator
206             n = KeepBlockCache.CacheSlot(locator)
207             self._cache.insert(0, n)
208             return n, True
209         finally:
210             self._cache_lock.release()
211
212 class KeepClient(object):
213     class ThreadLimiter(object):
214         """
215         Limit the number of threads running at a given time to
216         {desired successes} minus {successes reported}. When successes
217         reported == desired, wake up the remaining threads and tell
218         them to quit.
219
220         Should be used in a "with" block.
221         """
222         def __init__(self, todo):
223             self._todo = todo
224             self._done = 0
225             self._response = None
226             self._todo_lock = threading.Semaphore(todo)
227             self._done_lock = threading.Lock()
228
229         def __enter__(self):
230             self._todo_lock.acquire()
231             return self
232
233         def __exit__(self, type, value, traceback):
234             self._todo_lock.release()
235
236         def shall_i_proceed(self):
237             """
238             Return true if the current thread should do stuff. Return
239             false if the current thread should just stop.
240             """
241             with self._done_lock:
242                 return (self._done < self._todo)
243
244         def save_response(self, response_body, replicas_stored):
245             """
246             Records a response body (a locator, possibly signed) returned by
247             the Keep server.  It is not necessary to save more than
248             one response, since we presume that any locator returned
249             in response to a successful request is valid.
250             """
251             with self._done_lock:
252                 self._done += replicas_stored
253                 self._response = response_body
254
255         def response(self):
256             """
257             Returns the body from the response to a PUT request.
258             """
259             with self._done_lock:
260                 return self._response
261
262         def done(self):
263             """
264             Return how many successes were reported.
265             """
266             with self._done_lock:
267                 return self._done
268
269
270     class KeepService(object):
271         # Make requests to a single Keep service, and track results.
272         HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
273                        socket.error, ssl.SSLError)
274
275         def __init__(self, root, **headers):
276             self.root = root
277             self.last_result = None
278             self.success_flag = None
279             self.get_headers = {'Accept': 'application/octet-stream'}
280             self.get_headers.update(headers)
281             self.put_headers = headers
282
283         def usable(self):
284             return self.success_flag is not False
285
286         def finished(self):
287             return self.success_flag is not None
288
289         def last_status(self):
290             try:
291                 return int(self.last_result[0].status)
292             except (AttributeError, IndexError, ValueError, TypeError):
293                 return None
294
295         def get(self, http, locator):
296             # http is an httplib2.Http object.
297             # locator is a KeepLocator object.
298             url = self.root + str(locator)
299             _logger.debug("Request: GET %s", url)
300             try:
301                 with timer.Timer() as t:
302                     result = http.request(url.encode('utf-8'), 'GET',
303                                           headers=self.get_headers)
304             except self.HTTP_ERRORS as e:
305                 _logger.debug("Request fail: GET %s => %s: %s",
306                               url, type(e), str(e))
307                 self.last_result = e
308             else:
309                 self.last_result = result
310                 self.success_flag = retry.check_http_response_success(result)
311                 content = result[1]
312                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
313                              self.last_status(), len(content), t.msecs,
314                              (len(content)/(1024.0*1024))/t.secs)
315                 if self.success_flag:
316                     resp_md5 = hashlib.md5(content).hexdigest()
317                     if resp_md5 == locator.md5sum:
318                         return content
319                     _logger.warning("Checksum fail: md5(%s) = %s",
320                                     url, resp_md5)
321             return None
322
323         def put(self, http, hash_s, body):
324             url = self.root + hash_s
325             _logger.debug("Request: PUT %s", url)
326             try:
327                 result = http.request(url.encode('utf-8'), 'PUT',
328                                       headers=self.put_headers, body=body)
329             except self.HTTP_ERRORS as e:
330                 _logger.debug("Request fail: PUT %s => %s: %s",
331                               url, type(e), str(e))
332                 self.last_result = e
333             else:
334                 self.last_result = result
335                 self.success_flag = retry.check_http_response_success(result)
336             return self.success_flag
337
338
339     class KeepWriterThread(threading.Thread):
340         """
341         Write a blob of data to the given Keep server. On success, call
342         save_response() of the given ThreadLimiter to save the returned
343         locator.
344         """
345         def __init__(self, keep_service, **kwargs):
346             super(KeepClient.KeepWriterThread, self).__init__()
347             self.service = keep_service
348             self.args = kwargs
349             self._success = False
350
351         def success(self):
352             return self._success
353
354         def run(self):
355             with self.args['thread_limiter'] as limiter:
356                 if not limiter.shall_i_proceed():
357                     # My turn arrived, but the job has been done without
358                     # me.
359                     return
360                 self.run_with_limiter(limiter)
361
362         def run_with_limiter(self, limiter):
363             if self.service.finished():
364                 return
365             _logger.debug("KeepWriterThread %s proceeding %s %s",
366                           str(threading.current_thread()),
367                           self.args['data_hash'],
368                           self.args['service_root'])
369             h = httplib2.Http(timeout=self.args.get('timeout', None))
370             self._success = bool(self.service.put(
371                     h, self.args['data_hash'], self.args['data']))
372             status = self.service.last_status()
373             if self._success:
374                 resp, body = self.service.last_result
375                 _logger.debug("KeepWriterThread %s succeeded %s %s",
376                               str(threading.current_thread()),
377                               self.args['data_hash'],
378                               self.args['service_root'])
379                 # Tick the 'done' counter for the number of replica
380                 # reported stored by the server, for the case that
381                 # we're talking to a proxy or other backend that
382                 # stores to multiple copies for us.
383                 try:
384                     replicas_stored = int(resp['x-keep-replicas-stored'])
385                 except (KeyError, ValueError):
386                     replicas_stored = 1
387                 limiter.save_response(body.strip(), replicas_stored)
388             elif status is not None:
389                 _logger.debug("Request fail: PUT %s => %s %s",
390                               self.args['data_hash'], status,
391                               self.service.last_result[1])
392
393
394     def __init__(self, api_client=None, proxy=None, timeout=300,
395                  api_token=None, local_store=None, block_cache=None,
396                  num_retries=0):
397         """Initialize a new KeepClient.
398
399         Arguments:
400         * api_client: The API client to use to find Keep services.  If not
401           provided, KeepClient will build one from available Arvados
402           configuration.
403         * proxy: If specified, this KeepClient will send requests to this
404           Keep proxy.  Otherwise, KeepClient will fall back to the setting
405           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
406           ensure KeepClient does not use a proxy, pass in an empty string.
407         * timeout: The timeout for all HTTP requests, in seconds.  Default
408           300.
409         * api_token: If you're not using an API client, but only talking
410           directly to a Keep proxy, this parameter specifies an API token
411           to authenticate Keep requests.  It is an error to specify both
412           api_client and api_token.  If you specify neither, KeepClient
413           will use one available from the Arvados configuration.
414         * local_store: If specified, this KeepClient will bypass Keep
415           services, and save data to the named directory.  If unspecified,
416           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
417           environment variable.  If you want to ensure KeepClient does not
418           use local storage, pass in an empty string.  This is primarily
419           intended to mock a server for testing.
420         * num_retries: The default number of times to retry failed requests.
421           This will be used as the default num_retries value when get() and
422           put() are called.  Default 0.
423         """
424         self.lock = threading.Lock()
425         if proxy is None:
426             proxy = config.get('ARVADOS_KEEP_PROXY')
427         if api_token is None:
428             if api_client is None:
429                 api_token = config.get('ARVADOS_API_TOKEN')
430             else:
431                 api_token = api_client.api_token
432         elif api_client is not None:
433             raise ValueError(
434                 "can't build KeepClient with both API client and token")
435         if local_store is None:
436             local_store = os.environ.get('KEEP_LOCAL_STORE')
437
438         self.block_cache = block_cache if block_cache else KeepBlockCache()
439
440         if local_store:
441             self.local_store = local_store
442             self.get = self.local_store_get
443             self.put = self.local_store_put
444         else:
445             self.timeout = timeout
446             self.num_retries = num_retries
447             if proxy:
448                 if not proxy.endswith('/'):
449                     proxy += '/'
450                 self.api_token = api_token
451                 self._keep_services = [{
452                     'uuid': 'proxy',
453                     '_service_root': proxy,
454                     }]
455                 self.using_proxy = True
456                 self._static_services_list = True
457             else:
458                 # It's important to avoid instantiating an API client
459                 # unless we actually need one, for testing's sake.
460                 if api_client is None:
461                     api_client = arvados.api('v1')
462                 self.api_client = api_client
463                 self.api_token = api_client.api_token
464                 self._keep_services = None
465                 self.using_proxy = None
466                 self._static_services_list = False
467
468     def build_services_list(self, force_rebuild=False):
469         if (self._static_services_list or
470               (self._keep_services and not force_rebuild)):
471             return
472         with self.lock:
473             try:
474                 keep_services = self.api_client.keep_services().accessible()
475             except Exception:  # API server predates Keep services.
476                 keep_services = self.api_client.keep_disks().list()
477
478             self._keep_services = keep_services.execute().get('items')
479             if not self._keep_services:
480                 raise arvados.errors.NoKeepServersError()
481
482             self.using_proxy = any(ks.get('service_type') == 'proxy'
483                                    for ks in self._keep_services)
484
485             # Precompute the base URI for each service.
486             for r in self._keep_services:
487                 r['_service_root'] = "{}://[{}]:{:d}/".format(
488                     'https' if r['service_ssl_flag'] else 'http',
489                     r['service_host'],
490                     r['service_port'])
491             _logger.debug(str(self._keep_services))
492
493     def _service_weight(self, hash, service_uuid):
494         """Compute the weight of a Keep service endpoint for a data
495         block with a known hash.
496
497         The weight is md5(h + u) where u is the last 15 characters of
498         the service endpoint's UUID.
499         """
500         return hashlib.md5(hash + service_uuid[-15:]).hexdigest()
501
502     def weighted_service_roots(self, hash, force_rebuild=False):
503         """Return an array of Keep service endpoints, in the order in
504         which they should be probed when reading or writing data with
505         the given hash.
506         """
507         self.build_services_list(force_rebuild)
508
509         # Sort the available services by weight (heaviest first) for
510         # this hash, and return their service_roots (base URIs) in
511         # that order.
512         sorted_roots = [
513             svc['_service_root'] for svc in sorted(
514                 self._keep_services,
515                 reverse=True,
516                 key=lambda svc: self._service_weight(hash, svc['uuid']))]
517         _logger.debug(hash + ': ' + str(sorted_roots))
518         return sorted_roots
519
520     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
521         # roots_map is a dictionary, mapping Keep service root strings
522         # to KeepService objects.  Poll for Keep services, and add any
523         # new ones to roots_map.  Return the current list of local
524         # root strings.
525         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
526         local_roots = self.weighted_service_roots(md5_s, force_rebuild)
527         for root in local_roots:
528             if root not in roots_map:
529                 roots_map[root] = self.KeepService(root, **headers)
530         return local_roots
531
532     @staticmethod
533     def _check_loop_result(result):
534         # KeepClient RetryLoops should save results as a 2-tuple: the
535         # actual result of the request, and the number of servers available
536         # to receive the request this round.
537         # This method returns True if there's a real result, False if
538         # there are no more servers available, otherwise None.
539         if isinstance(result, Exception):
540             return None
541         result, tried_server_count = result
542         if (result is not None) and (result is not False):
543             return True
544         elif tried_server_count < 1:
545             _logger.info("No more Keep services to try; giving up")
546             return False
547         else:
548             return None
549
550     @retry.retry_method
551     def get(self, loc_s, num_retries=None):
552         """Get data from Keep.
553
554         This method fetches one or more blocks of data from Keep.  It
555         sends a request each Keep service registered with the API
556         server (or the proxy provided when this client was
557         instantiated), then each service named in location hints, in
558         sequence.  As soon as one service provides the data, it's
559         returned.
560
561         Arguments:
562         * loc_s: A string of one or more comma-separated locators to fetch.
563           This method returns the concatenation of these blocks.
564         * num_retries: The number of times to retry GET requests to
565           *each* Keep server if it returns temporary failures, with
566           exponential backoff.  Note that, in each loop, the method may try
567           to fetch data from every available Keep service, along with any
568           that are named in location hints in the locator.  The default value
569           is set when the KeepClient is initialized.
570         """
571         if ',' in loc_s:
572             return ''.join(self.get(x) for x in loc_s.split(','))
573         locator = KeepLocator(loc_s)
574         expect_hash = locator.md5sum
575
576         slot, first = self.block_cache.reserve_cache(expect_hash)
577         if not first:
578             v = slot.get()
579             return v
580
581         # See #3147 for a discussion of the loop implementation.  Highlights:
582         # * Refresh the list of Keep services after each failure, in case
583         #   it's being updated.
584         # * Retry until we succeed, we're out of retries, or every available
585         #   service has returned permanent failure.
586         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
587                       for hint in locator.hints if hint.startswith('K@')]
588         # Map root URLs their KeepService objects.
589         roots_map = {root: self.KeepService(root) for root in hint_roots}
590         blob = None
591         loop = retry.RetryLoop(num_retries, self._check_loop_result,
592                                backoff_start=2)
593         for tries_left in loop:
594             try:
595                 local_roots = self.map_new_services(
596                     roots_map, expect_hash,
597                     force_rebuild=(tries_left < num_retries))
598             except Exception as error:
599                 loop.save_result(error)
600                 continue
601
602             # Query KeepService objects that haven't returned
603             # permanent failure, in our specified shuffle order.
604             services_to_try = [roots_map[root]
605                                for root in (local_roots + hint_roots)
606                                if roots_map[root].usable()]
607             http = httplib2.Http(timeout=self.timeout)
608             for keep_service in services_to_try:
609                 blob = keep_service.get(http, locator)
610                 if blob is not None:
611                     break
612             loop.save_result((blob, len(services_to_try)))
613
614         # Always cache the result, then return it if we succeeded.
615         slot.set(blob)
616         self.block_cache.cap_cache()
617         if loop.success():
618             return blob
619
620         # No servers fulfilled the request.  Count how many responded
621         # "not found;" if the ratio is high enough (currently 75%), report
622         # Not Found; otherwise a generic error.
623         # Q: Including 403 is necessary for the Keep tests to continue
624         # passing, but maybe they should expect KeepReadError instead?
625         not_founds = sum(1 for ks in roots_map.values()
626                          if ks.last_status() in set([403, 404, 410]))
627         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
628             raise arvados.errors.NotFoundError(loc_s)
629         else:
630             raise arvados.errors.KeepReadError(loc_s)
631
632     @retry.retry_method
633     def put(self, data, copies=2, num_retries=None):
634         """Save data in Keep.
635
636         This method will get a list of Keep services from the API server, and
637         send the data to each one simultaneously in a new thread.  Once the
638         uploads are finished, if enough copies are saved, this method returns
639         the most recent HTTP response body.  If requests fail to upload
640         enough copies, this method raises KeepWriteError.
641
642         Arguments:
643         * data: The string of data to upload.
644         * copies: The number of copies that the user requires be saved.
645           Default 2.
646         * num_retries: The number of times to retry PUT requests to
647           *each* Keep server if it returns temporary failures, with
648           exponential backoff.  The default value is set when the
649           KeepClient is initialized.
650         """
651         data_hash = hashlib.md5(data).hexdigest()
652         if copies < 1:
653             return data_hash
654
655         headers = {}
656         if self.using_proxy:
657             # Tell the proxy how many copies we want it to store
658             headers['X-Keep-Desired-Replication'] = str(copies)
659         roots_map = {}
660         thread_limiter = KeepClient.ThreadLimiter(copies)
661         loop = retry.RetryLoop(num_retries, self._check_loop_result,
662                                backoff_start=2)
663         for tries_left in loop:
664             try:
665                 local_roots = self.map_new_services(
666                     roots_map, data_hash,
667                     force_rebuild=(tries_left < num_retries), **headers)
668             except Exception as error:
669                 loop.save_result(error)
670                 continue
671
672             threads = []
673             for service_root, ks in roots_map.iteritems():
674                 if ks.finished():
675                     continue
676                 t = KeepClient.KeepWriterThread(
677                     ks,
678                     data=data,
679                     data_hash=data_hash,
680                     service_root=service_root,
681                     thread_limiter=thread_limiter,
682                     timeout=self.timeout)
683                 t.start()
684                 threads.append(t)
685             for t in threads:
686                 t.join()
687             loop.save_result((thread_limiter.done() >= copies, len(threads)))
688
689         if loop.success():
690             return thread_limiter.response()
691         raise arvados.errors.KeepWriteError(
692             "Write fail for %s: wanted %d but wrote %d" %
693             (data_hash, copies, thread_limiter.done()))
694
695     # Local storage methods need no-op num_retries arguments to keep
696     # integration tests happy.  With better isolation they could
697     # probably be removed again.
698     def local_store_put(self, data, num_retries=0):
699         md5 = hashlib.md5(data).hexdigest()
700         locator = '%s+%d' % (md5, len(data))
701         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
702             f.write(data)
703         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
704                   os.path.join(self.local_store, md5))
705         return locator
706
707     def local_store_get(self, loc_s, num_retries=0):
708         try:
709             locator = KeepLocator(loc_s)
710         except ValueError:
711             raise arvados.errors.NotFoundError(
712                 "Invalid data locator: '%s'" % loc_s)
713         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
714             return ''
715         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
716             return f.read()