3147: Add retry support to Python SDK's KeepClient.
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import timer
21 import datetime
22 import ssl
23
24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.retry as retry
31 import arvados.util
32
33 class KeepLocator(object):
34     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
35     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
36
37     def __init__(self, locator_str):
38         self.hints = []
39         self._perm_sig = None
40         self._perm_expiry = None
41         pieces = iter(locator_str.split('+'))
42         self.md5sum = next(pieces)
43         try:
44             self.size = int(next(pieces))
45         except StopIteration:
46             self.size = None
47         for hint in pieces:
48             if self.HINT_RE.match(hint) is None:
49                 raise ValueError("unrecognized hint data {}".format(hint))
50             elif hint.startswith('A'):
51                 self.parse_permission_hint(hint)
52             else:
53                 self.hints.append(hint)
54
55     def __str__(self):
56         return '+'.join(
57             str(s) for s in [self.md5sum, self.size,
58                              self.permission_hint()] + self.hints
59             if s is not None)
60
61     def _make_hex_prop(name, length):
62         # Build and return a new property with the given name that
63         # must be a hex string of the given length.
64         data_name = '_{}'.format(name)
65         def getter(self):
66             return getattr(self, data_name)
67         def setter(self, hex_str):
68             if not arvados.util.is_hex(hex_str, length):
69                 raise ValueError("{} must be a {}-digit hex string: {}".
70                                  format(name, length, hex_str))
71             setattr(self, data_name, hex_str)
72         return property(getter, setter)
73
74     md5sum = _make_hex_prop('md5sum', 32)
75     perm_sig = _make_hex_prop('perm_sig', 40)
76
77     @property
78     def perm_expiry(self):
79         return self._perm_expiry
80
81     @perm_expiry.setter
82     def perm_expiry(self, value):
83         if not arvados.util.is_hex(value, 1, 8):
84             raise ValueError(
85                 "permission timestamp must be a hex Unix timestamp: {}".
86                 format(value))
87         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
88
89     def permission_hint(self):
90         data = [self.perm_sig, self.perm_expiry]
91         if None in data:
92             return None
93         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
94         return "A{}@{:08x}".format(*data)
95
96     def parse_permission_hint(self, s):
97         try:
98             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
99         except IndexError:
100             raise ValueError("bad permission hint {}".format(s))
101
102     def permission_expired(self, as_of_dt=None):
103         if self.perm_expiry is None:
104             return False
105         elif as_of_dt is None:
106             as_of_dt = datetime.datetime.now()
107         return self.perm_expiry <= as_of_dt
108
109
110 class Keep(object):
111     """Simple interface to a global KeepClient object.
112
113     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
114     own API client.  The global KeepClient will build an API client from the
115     current Arvados configuration, which may not match the one you built.
116     """
117     _last_key = None
118
119     @classmethod
120     def global_client_object(cls):
121         global global_client_object
122         # Previously, KeepClient would change its behavior at runtime based
123         # on these configuration settings.  We simulate that behavior here
124         # by checking the values and returning a new KeepClient if any of
125         # them have changed.
126         key = (config.get('ARVADOS_API_HOST'),
127                config.get('ARVADOS_API_TOKEN'),
128                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
129                config.get('ARVADOS_KEEP_PROXY'),
130                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
131                os.environ.get('KEEP_LOCAL_STORE'))
132         if (global_client_object is None) or (cls._last_key != key):
133             global_client_object = KeepClient()
134             cls._last_key = key
135         return global_client_object
136
137     @staticmethod
138     def get(locator, **kwargs):
139         return Keep.global_client_object().get(locator, **kwargs)
140
141     @staticmethod
142     def put(data, **kwargs):
143         return Keep.global_client_object().put(data, **kwargs)
144
145
146 class KeepClient(object):
147     class ThreadLimiter(object):
148         """
149         Limit the number of threads running at a given time to
150         {desired successes} minus {successes reported}. When successes
151         reported == desired, wake up the remaining threads and tell
152         them to quit.
153
154         Should be used in a "with" block.
155         """
156         def __init__(self, todo):
157             self._todo = todo
158             self._done = 0
159             self._response = None
160             self._todo_lock = threading.Semaphore(todo)
161             self._done_lock = threading.Lock()
162
163         def __enter__(self):
164             self._todo_lock.acquire()
165             return self
166
167         def __exit__(self, type, value, traceback):
168             self._todo_lock.release()
169
170         def shall_i_proceed(self):
171             """
172             Return true if the current thread should do stuff. Return
173             false if the current thread should just stop.
174             """
175             with self._done_lock:
176                 return (self._done < self._todo)
177
178         def save_response(self, response_body, replicas_stored):
179             """
180             Records a response body (a locator, possibly signed) returned by
181             the Keep server.  It is not necessary to save more than
182             one response, since we presume that any locator returned
183             in response to a successful request is valid.
184             """
185             with self._done_lock:
186                 self._done += replicas_stored
187                 self._response = response_body
188
189         def response(self):
190             """
191             Returns the body from the response to a PUT request.
192             """
193             with self._done_lock:
194                 return self._response
195
196         def done(self):
197             """
198             Return how many successes were reported.
199             """
200             with self._done_lock:
201                 return self._done
202
203
204     class KeepService(object):
205         # Make requests to a single Keep service, and track results.
206         HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
207                        ssl.SSLError)
208
209         def __init__(self, root, **headers):
210             self.root = root
211             self.last_result = None
212             self.success_flag = None
213             self.get_headers = {'Accept': 'application/octet-stream'}
214             self.get_headers.update(headers)
215             self.put_headers = headers
216
217         def usable(self):
218             return self.success_flag is not False
219
220         def finished(self):
221             return self.success_flag is not None
222
223         def last_status(self):
224             try:
225                 return int(self.last_result[0].status)
226             except (AttributeError, IndexError, ValueError):
227                 return None
228
229         def get(self, http, locator):
230             # http is an httplib2.Http object.
231             # locator is a KeepLocator object.
232             url = self.root + str(locator)
233             _logger.debug("Request: GET %s", url)
234             try:
235                 with timer.Timer() as t:
236                     result = http.request(url.encode('utf-8'), 'GET',
237                                           headers=self.get_headers)
238             except self.HTTP_ERRORS as e:
239                 _logger.debug("Request fail: GET %s => %s: %s",
240                               url, type(e), str(e))
241                 self.last_result = e
242             else:
243                 self.last_result = result
244                 self.success_flag = retry.check_http_response_success(result)
245                 content = result[1]
246                 _logger.info("%s response: %s bytes in %s msec (%s MiB/sec)",
247                              self.last_status(), len(content), t.msecs,
248                              (len(content)/(1024*1024))/t.secs)
249                 if self.success_flag:
250                     resp_md5 = hashlib.md5(content).hexdigest()
251                     if resp_md5 == locator.md5sum:
252                         return content
253                     _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
254             return None
255
256         def put(self, http, hash_s, body):
257             url = self.root + hash_s
258             _logger.debug("Request: PUT %s", url)
259             try:
260                 result = http.request(url.encode('utf-8'), 'PUT',
261                                       headers=self.put_headers, body=body)
262             except self.HTTP_ERRORS as e:
263                 _logger.debug("Request fail: PUT %s => %s: %s",
264                               url, type(e), str(e))
265                 self.last_result = e
266             else:
267                 self.last_result = result
268                 self.success_flag = retry.check_http_response_success(result)
269             return self.success_flag
270
271
272     class KeepWriterThread(threading.Thread):
273         """
274         Write a blob of data to the given Keep server. On success, call
275         save_response() of the given ThreadLimiter to save the returned
276         locator.
277         """
278         def __init__(self, keep_service, **kwargs):
279             super(KeepClient.KeepWriterThread, self).__init__()
280             self.service = keep_service
281             self.args = kwargs
282             self._success = False
283
284         def success(self):
285             return self._success
286
287         def run(self):
288             with self.args['thread_limiter'] as limiter:
289                 if not limiter.shall_i_proceed():
290                     # My turn arrived, but the job has been done without
291                     # me.
292                     return
293                 self.run_with_limiter(limiter)
294
295         def run_with_limiter(self, limiter):
296             if self.service.finished():
297                 return
298             _logger.debug("KeepWriterThread %s proceeding %s %s",
299                           str(threading.current_thread()),
300                           self.args['data_hash'],
301                           self.args['service_root'])
302             h = httplib2.Http(timeout=self.args.get('timeout', None))
303             self._success = bool(self.service.put(
304                     h, self.args['data_hash'], self.args['data']))
305             status = self.service.last_status()
306             if self._success:
307                 resp, body = self.service.last_result
308                 _logger.debug("KeepWriterThread %s succeeded %s %s",
309                               str(threading.current_thread()),
310                               self.args['data_hash'],
311                               self.args['service_root'])
312                 # Tick the 'done' counter for the number of replica
313                 # reported stored by the server, for the case that
314                 # we're talking to a proxy or other backend that
315                 # stores to multiple copies for us.
316                 try:
317                     replicas_stored = int(resp['x-keep-replicas-stored'])
318                 except (KeyError, ValueError):
319                     replicas_stored = 1
320                 limiter.save_response(body.strip(), replicas_stored)
321             elif status is not None:
322                 _logger.debug("Request fail: PUT %s => %s %s",
323                               self.args['data_hash'], status,
324                               self.service.last_result[1])
325
326
327     def __init__(self, api_client=None, proxy=None, timeout=60,
328                  api_token=None, local_store=None):
329         """Initialize a new KeepClient.
330
331         Arguments:
332         * api_client: The API client to use to find Keep services.  If not
333           provided, KeepClient will build one from available Arvados
334           configuration.
335         * proxy: If specified, this KeepClient will send requests to this
336           Keep proxy.  Otherwise, KeepClient will fall back to the setting
337           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
338           ensure KeepClient does not use a proxy, pass in an empty string.
339         * timeout: The timeout for all HTTP requests, in seconds.  Default
340           60.
341         * api_token: If you're not using an API client, but only talking
342           directly to a Keep proxy, this parameter specifies an API token
343           to authenticate Keep requests.  It is an error to specify both
344           api_client and api_token.  If you specify neither, KeepClient
345           will use one available from the Arvados configuration.
346         * local_store: If specified, this KeepClient will bypass Keep
347           services, and save data to the named directory.  If unspecified,
348           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
349           environment variable.  If you want to ensure KeepClient does not
350           use local storage, pass in an empty string.  This is primarily
351           intended to mock a server for testing.
352         """
353         self.lock = threading.Lock()
354         if proxy is None:
355             proxy = config.get('ARVADOS_KEEP_PROXY')
356         if api_token is None:
357             api_token = config.get('ARVADOS_API_TOKEN')
358         elif api_client is not None:
359             raise ValueError(
360                 "can't build KeepClient with both API client and token")
361         if local_store is None:
362             local_store = os.environ.get('KEEP_LOCAL_STORE')
363
364         if local_store:
365             self.local_store = local_store
366             self.get = self.local_store_get
367             self.put = self.local_store_put
368         else:
369             self.timeout = timeout
370             self.cache_max = 256 * 1024 * 1024  # Cache is 256MiB
371             self._cache = []
372             self._cache_lock = threading.Lock()
373             if proxy:
374                 if not proxy.endswith('/'):
375                     proxy += '/'
376                 self.api_token = api_token
377                 self.service_roots = [proxy]
378                 self.using_proxy = True
379                 self.static_service_roots = True
380             else:
381                 # It's important to avoid instantiating an API client
382                 # unless we actually need one, for testing's sake.
383                 if api_client is None:
384                     api_client = arvados.api('v1')
385                 self.api_client = api_client
386                 self.api_token = api_client.api_token
387                 self.service_roots = None
388                 self.using_proxy = None
389                 self.static_service_roots = False
390
391     def build_service_roots(self, force_rebuild=False):
392         if (self.static_service_roots or
393               (self.service_roots and not force_rebuild)):
394             return
395         with self.lock:
396             try:
397                 keep_services = self.api_client.keep_services().accessible()
398             except Exception:  # API server predates Keep services.
399                 keep_services = self.api_client.keep_disks().list()
400
401             keep_services = keep_services.execute().get('items')
402             if not keep_services:
403                 raise arvados.errors.NoKeepServersError()
404
405             self.using_proxy = (keep_services[0].get('service_type') ==
406                                 'proxy')
407
408             roots = (("http%s://%s:%d/" %
409                       ('s' if f['service_ssl_flag'] else '',
410                        f['service_host'],
411                        f['service_port']))
412                      for f in keep_services)
413             self.service_roots = sorted(set(roots))
414             _logger.debug(str(self.service_roots))
415
416     def shuffled_service_roots(self, hash, force_rebuild=False):
417         self.build_service_roots(force_rebuild)
418
419         # Build an ordering with which to query the Keep servers based on the
420         # contents of the hash.
421         # "hash" is a hex-encoded number at least 8 digits
422         # (32 bits) long
423
424         # seed used to calculate the next keep server from 'pool'
425         # to be added to 'pseq'
426         seed = hash
427
428         # Keep servers still to be added to the ordering
429         pool = self.service_roots[:]
430
431         # output probe sequence
432         pseq = []
433
434         # iterate while there are servers left to be assigned
435         while len(pool) > 0:
436             if len(seed) < 8:
437                 # ran out of digits in the seed
438                 if len(pseq) < len(hash) / 4:
439                     # the number of servers added to the probe sequence is less
440                     # than the number of 4-digit slices in 'hash' so refill the
441                     # seed with the last 4 digits and then append the contents
442                     # of 'hash'.
443                     seed = hash[-4:] + hash
444                 else:
445                     # refill the seed with the contents of 'hash'
446                     seed += hash
447
448             # Take the next 8 digits (32 bytes) and interpret as an integer,
449             # then modulus with the size of the remaining pool to get the next
450             # selected server.
451             probe = int(seed[0:8], 16) % len(pool)
452
453             # Append the selected server to the probe sequence and remove it
454             # from the pool.
455             pseq += [pool[probe]]
456             pool = pool[:probe] + pool[probe+1:]
457
458             # Remove the digits just used from the seed
459             seed = seed[8:]
460         _logger.debug(str(pseq))
461         return pseq
462
463     class CacheSlot(object):
464         def __init__(self, locator):
465             self.locator = locator
466             self.ready = threading.Event()
467             self.content = None
468
469         def get(self):
470             self.ready.wait()
471             return self.content
472
473         def set(self, value):
474             self.content = value
475             self.ready.set()
476
477         def size(self):
478             if self.content == None:
479                 return 0
480             else:
481                 return len(self.content)
482
483     def cap_cache(self):
484         '''Cap the cache size to self.cache_max'''
485         self._cache_lock.acquire()
486         try:
487             self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
488             sm = sum([slot.size() for slot in self._cache])
489             while sm > self.cache_max:
490                 del self._cache[-1]
491                 sm = sum([slot.size() for a in self._cache])
492         finally:
493             self._cache_lock.release()
494
495     def reserve_cache(self, locator):
496         '''Reserve a cache slot for the specified locator,
497         or return the existing slot.'''
498         self._cache_lock.acquire()
499         try:
500             # Test if the locator is already in the cache
501             for i in xrange(0, len(self._cache)):
502                 if self._cache[i].locator == locator:
503                     n = self._cache[i]
504                     if i != 0:
505                         # move it to the front
506                         del self._cache[i]
507                         self._cache.insert(0, n)
508                     return n, False
509
510             # Add a new cache slot for the locator
511             n = KeepClient.CacheSlot(locator)
512             self._cache.insert(0, n)
513             return n, True
514         finally:
515             self._cache_lock.release()
516
517     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
518         # roots_map is a dictionary, mapping Keep service root strings
519         # to KeepService objects.  Poll for Keep services, and add any
520         # new ones to roots_map.  Return the current list of local
521         # root strings.
522         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
523         local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
524         for root in local_roots:
525             if root not in roots_map:
526                 roots_map[root] = self.KeepService(root, **headers)
527         return local_roots
528
529     @staticmethod
530     def _check_loop_result(result):
531         # KeepClient RetryLoops should save results as a 2-tuple: the
532         # actual result of the request, and the number of servers available
533         # to receive the request this round.
534         # This method returns True if there's a real result, False if
535         # there are no more servers available, otherwise None.
536         if isinstance(result, Exception):
537             return None
538         result, tried_server_count = result
539         if (result is not None) and (result is not False):
540             return True
541         elif tried_server_count < 1:
542             _logger.info("No more Keep services to try; giving up")
543             return False
544         else:
545             return None
546
547     def get(self, loc_s, num_retries=0):
548         """Get data from Keep.
549
550         This method fetches one or more blocks of data from Keep.  It
551         sends a request each Keep service registered with the API
552         server (or the proxy provided when this client was
553         instantiated), then each service named in location hints, in
554         sequence.  As soon as one service provides the data, it's
555         returned.
556
557         Arguments:
558         * loc_s: A string of one or more comma-separated locators to fetch.
559           This method returns the concatenation of these blocks.
560         * num_retries: The number of times to retry GET requests to
561           *each* Keep server if it returns temporary failures, with
562           exponential backoff.  Note that, in each loop, the method may try
563           to fetch data from every available Keep service, along with any
564           that are named in location hints in the locator.  Default 0.
565         """
566         if ',' in loc_s:
567             return ''.join(self.get(x) for x in loc_s.split(','))
568         locator = KeepLocator(loc_s)
569         expect_hash = locator.md5sum
570
571         slot, first = self.reserve_cache(expect_hash)
572         if not first:
573             v = slot.get()
574             return v
575
576         # See #3147 for a discussion of the loop implementation.  Highlights:
577         # * Refresh the list of Keep services after each failure, in case
578         #   it's being updated.
579         # * Retry until we succeed, we're out of retries, or every available
580         #   service has returned permanent failure.
581         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
582                       for hint in locator.hints if hint.startswith('K@')]
583         # Map root URLs their KeepService objects.
584         roots_map = {root: self.KeepService(root) for root in hint_roots}
585         blob = None
586         loop = retry.RetryLoop(num_retries, self._check_loop_result,
587                                backoff_start=2)
588         for tries_left in loop:
589             try:
590                 local_roots = self.map_new_services(
591                     roots_map, expect_hash,
592                     force_rebuild=(tries_left < num_retries))
593             except Exception as error:
594                 loop.save_result(error)
595                 continue
596
597             # Query KeepService objects that haven't returned
598             # permanent failure, in our specified shuffle order.
599             services_to_try = [roots_map[root]
600                                for root in (local_roots + hint_roots)
601                                if roots_map[root].usable()]
602             http = httplib2.Http(timeout=self.timeout)
603             for keep_service in services_to_try:
604                 blob = keep_service.get(http, locator)
605                 if blob is not None:
606                     break
607             loop.save_result((blob, len(services_to_try)))
608
609         # Always cache the result, then return it if we succeeded.
610         slot.set(blob)
611         self.cap_cache()
612         if loop.success():
613             return blob
614
615         # No servers fulfilled the request.  Count how many responded
616         # "not found;" if the ratio is high enough (currently 75%), report
617         # Not Found; otherwise a generic error.
618         # Q: Including 403 is necessary for the Keep tests to continue
619         # passing, but maybe they should expect KeepReadError instead?
620         not_founds = sum(1 for ks in roots_map.values()
621                          if ks.last_status() in set([403, 404, 410]))
622         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
623             raise arvados.errors.NotFoundError(loc_s)
624         else:
625             raise arvados.errors.KeepReadError(loc_s)
626
627     def put(self, data, copies=2, num_retries=0):
628         """Save data in Keep.
629
630         This method will get a list of Keep services from the API server, and
631         send the data to each one simultaneously in a new thread.  Once the
632         uploads are finished, if enough copies are saved, this method returns
633         the most recent HTTP response body.  If requests fail to upload
634         enough copies, this method raises KeepWriteError.
635
636         Arguments:
637         * data: The string of data to upload.
638         * copies: The number of copies that the user requires be saved.
639           Default 2.
640         * num_retries: The number of times to retry PUT requests to
641           *each* Keep server if it returns temporary failures, with
642           exponential backoff.  Default 0.
643         """
644         data_hash = hashlib.md5(data).hexdigest()
645         if copies < 1:
646             return data_hash
647
648         headers = {}
649         if self.using_proxy:
650             # Tell the proxy how many copies we want it to store
651             headers['X-Keep-Desired-Replication'] = str(copies)
652         roots_map = {}
653         thread_limiter = KeepClient.ThreadLimiter(copies)
654         loop = retry.RetryLoop(num_retries, self._check_loop_result,
655                                backoff_start=2)
656         for tries_left in loop:
657             try:
658                 local_roots = self.map_new_services(
659                     roots_map, data_hash,
660                     force_rebuild=(tries_left < num_retries), **headers)
661             except Exception as error:
662                 loop.save_result(error)
663                 continue
664
665             threads = []
666             for service_root, ks in roots_map.iteritems():
667                 if ks.finished():
668                     continue
669                 t = KeepClient.KeepWriterThread(
670                     ks,
671                     data=data,
672                     data_hash=data_hash,
673                     service_root=service_root,
674                     thread_limiter=thread_limiter,
675                     timeout=self.timeout)
676                 t.start()
677                 threads.append(t)
678             for t in threads:
679                 t.join()
680             loop.save_result((thread_limiter.done() >= copies, len(threads)))
681
682         if loop.success():
683             return thread_limiter.response()
684         raise arvados.errors.KeepWriteError(
685             "Write fail for %s: wanted %d but wrote %d" %
686             (data_hash, copies, thread_limiter.done()))
687
688     def local_store_put(self, data):
689         md5 = hashlib.md5(data).hexdigest()
690         locator = '%s+%d' % (md5, len(data))
691         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
692             f.write(data)
693         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
694                   os.path.join(self.local_store, md5))
695         return locator
696
697     def local_store_get(self, loc_s):
698         try:
699             locator = KeepLocator(loc_s)
700         except ValueError:
701             raise arvados.errors.NotFoundError(
702                 "Invalid data locator: '%s'" % loc_s)
703         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
704             return ''
705         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
706             return f.read()