3699: code review
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import timer
21 import datetime
22 import ssl
23 import socket
24
25 _logger = logging.getLogger('arvados.keep')
26 global_client_object = None
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 class KeepLocator(object):
35     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
36     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37
38     def __init__(self, locator_str):
39         self.hints = []
40         self._perm_sig = None
41         self._perm_expiry = None
42         pieces = iter(locator_str.split('+'))
43         self.md5sum = next(pieces)
44         try:
45             self.size = int(next(pieces))
46         except StopIteration:
47             self.size = None
48         for hint in pieces:
49             if self.HINT_RE.match(hint) is None:
50                 raise ValueError("unrecognized hint data {}".format(hint))
51             elif hint.startswith('A'):
52                 self.parse_permission_hint(hint)
53             else:
54                 self.hints.append(hint)
55
56     def __str__(self):
57         return '+'.join(
58             str(s) for s in [self.md5sum, self.size,
59                              self.permission_hint()] + self.hints
60             if s is not None)
61
62     def _make_hex_prop(name, length):
63         # Build and return a new property with the given name that
64         # must be a hex string of the given length.
65         data_name = '_{}'.format(name)
66         def getter(self):
67             return getattr(self, data_name)
68         def setter(self, hex_str):
69             if not arvados.util.is_hex(hex_str, length):
70                 raise ValueError("{} must be a {}-digit hex string: {}".
71                                  format(name, length, hex_str))
72             setattr(self, data_name, hex_str)
73         return property(getter, setter)
74
75     md5sum = _make_hex_prop('md5sum', 32)
76     perm_sig = _make_hex_prop('perm_sig', 40)
77
78     @property
79     def perm_expiry(self):
80         return self._perm_expiry
81
82     @perm_expiry.setter
83     def perm_expiry(self, value):
84         if not arvados.util.is_hex(value, 1, 8):
85             raise ValueError(
86                 "permission timestamp must be a hex Unix timestamp: {}".
87                 format(value))
88         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89
90     def permission_hint(self):
91         data = [self.perm_sig, self.perm_expiry]
92         if None in data:
93             return None
94         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
95         return "A{}@{:08x}".format(*data)
96
97     def parse_permission_hint(self, s):
98         try:
99             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100         except IndexError:
101             raise ValueError("bad permission hint {}".format(s))
102
103     def permission_expired(self, as_of_dt=None):
104         if self.perm_expiry is None:
105             return False
106         elif as_of_dt is None:
107             as_of_dt = datetime.datetime.now()
108         return self.perm_expiry <= as_of_dt
109
110
111 class Keep(object):
112     """Simple interface to a global KeepClient object.
113
114     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
115     own API client.  The global KeepClient will build an API client from the
116     current Arvados configuration, which may not match the one you built.
117     """
118     _last_key = None
119
120     @classmethod
121     def global_client_object(cls):
122         global global_client_object
123         # Previously, KeepClient would change its behavior at runtime based
124         # on these configuration settings.  We simulate that behavior here
125         # by checking the values and returning a new KeepClient if any of
126         # them have changed.
127         key = (config.get('ARVADOS_API_HOST'),
128                config.get('ARVADOS_API_TOKEN'),
129                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
130                config.get('ARVADOS_KEEP_PROXY'),
131                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
132                os.environ.get('KEEP_LOCAL_STORE'))
133         if (global_client_object is None) or (cls._last_key != key):
134             global_client_object = KeepClient()
135             cls._last_key = key
136         return global_client_object
137
138     @staticmethod
139     def get(locator, **kwargs):
140         return Keep.global_client_object().get(locator, **kwargs)
141
142     @staticmethod
143     def put(data, **kwargs):
144         return Keep.global_client_object().put(data, **kwargs)
145
146
147 class KeepClient(object):
148     class ThreadLimiter(object):
149         """
150         Limit the number of threads running at a given time to
151         {desired successes} minus {successes reported}. When successes
152         reported == desired, wake up the remaining threads and tell
153         them to quit.
154
155         Should be used in a "with" block.
156         """
157         def __init__(self, todo):
158             self._todo = todo
159             self._done = 0
160             self._response = None
161             self._todo_lock = threading.Semaphore(todo)
162             self._done_lock = threading.Lock()
163
164         def __enter__(self):
165             self._todo_lock.acquire()
166             return self
167
168         def __exit__(self, type, value, traceback):
169             self._todo_lock.release()
170
171         def shall_i_proceed(self):
172             """
173             Return true if the current thread should do stuff. Return
174             false if the current thread should just stop.
175             """
176             with self._done_lock:
177                 return (self._done < self._todo)
178
179         def save_response(self, response_body, replicas_stored):
180             """
181             Records a response body (a locator, possibly signed) returned by
182             the Keep server.  It is not necessary to save more than
183             one response, since we presume that any locator returned
184             in response to a successful request is valid.
185             """
186             with self._done_lock:
187                 self._done += replicas_stored
188                 self._response = response_body
189
190         def response(self):
191             """
192             Returns the body from the response to a PUT request.
193             """
194             with self._done_lock:
195                 return self._response
196
197         def done(self):
198             """
199             Return how many successes were reported.
200             """
201             with self._done_lock:
202                 return self._done
203
204
205     class KeepService(object):
206         # Make requests to a single Keep service, and track results.
207         HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
208                        socket.error, ssl.SSLError)
209
210         def __init__(self, root, **headers):
211             self.root = root
212             self.last_result = None
213             self.success_flag = None
214             self.get_headers = {'Accept': 'application/octet-stream'}
215             self.get_headers.update(headers)
216             self.put_headers = headers
217
218         def usable(self):
219             return self.success_flag is not False
220
221         def finished(self):
222             return self.success_flag is not None
223
224         def last_status(self):
225             try:
226                 return int(self.last_result[0].status)
227             except (AttributeError, IndexError, ValueError):
228                 return None
229
230         def get(self, http, locator):
231             # http is an httplib2.Http object.
232             # locator is a KeepLocator object.
233             url = self.root + str(locator)
234             _logger.debug("Request: GET %s", url)
235             try:
236                 with timer.Timer() as t:
237                     result = http.request(url.encode('utf-8'), 'GET',
238                                           headers=self.get_headers)
239             except self.HTTP_ERRORS as e:
240                 _logger.debug("Request fail: GET %s => %s: %s",
241                               url, type(e), str(e))
242                 self.last_result = e
243             else:
244                 self.last_result = result
245                 self.success_flag = retry.check_http_response_success(result)
246                 content = result[1]
247                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
248                              self.last_status(), len(content), t.msecs,
249                              (len(content)/(1024.0*1024))/t.secs)
250                 if self.success_flag:
251                     resp_md5 = hashlib.md5(content).hexdigest()
252                     if resp_md5 == locator.md5sum:
253                         return content
254                     _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
255             return None
256
257         def put(self, http, hash_s, body):
258             url = self.root + hash_s
259             _logger.debug("Request: PUT %s", url)
260             try:
261                 result = http.request(url.encode('utf-8'), 'PUT',
262                                       headers=self.put_headers, body=body)
263             except self.HTTP_ERRORS as e:
264                 _logger.debug("Request fail: PUT %s => %s: %s",
265                               url, type(e), str(e))
266                 self.last_result = e
267             else:
268                 self.last_result = result
269                 self.success_flag = retry.check_http_response_success(result)
270             return self.success_flag
271
272
273     class KeepWriterThread(threading.Thread):
274         """
275         Write a blob of data to the given Keep server. On success, call
276         save_response() of the given ThreadLimiter to save the returned
277         locator.
278         """
279         def __init__(self, keep_service, **kwargs):
280             super(KeepClient.KeepWriterThread, self).__init__()
281             self.service = keep_service
282             self.args = kwargs
283             self._success = False
284
285         def success(self):
286             return self._success
287
288         def run(self):
289             with self.args['thread_limiter'] as limiter:
290                 if not limiter.shall_i_proceed():
291                     # My turn arrived, but the job has been done without
292                     # me.
293                     return
294                 self.run_with_limiter(limiter)
295
296         def run_with_limiter(self, limiter):
297             if self.service.finished():
298                 return
299             _logger.debug("KeepWriterThread %s proceeding %s %s",
300                           str(threading.current_thread()),
301                           self.args['data_hash'],
302                           self.args['service_root'])
303             h = httplib2.Http(timeout=self.args.get('timeout', None))
304             self._success = bool(self.service.put(
305                     h, self.args['data_hash'], self.args['data']))
306             status = self.service.last_status()
307             if self._success:
308                 resp, body = self.service.last_result
309                 _logger.debug("KeepWriterThread %s succeeded %s %s",
310                               str(threading.current_thread()),
311                               self.args['data_hash'],
312                               self.args['service_root'])
313                 # Tick the 'done' counter for the number of replica
314                 # reported stored by the server, for the case that
315                 # we're talking to a proxy or other backend that
316                 # stores to multiple copies for us.
317                 try:
318                     replicas_stored = int(resp['x-keep-replicas-stored'])
319                 except (KeyError, ValueError):
320                     replicas_stored = 1
321                 limiter.save_response(body.strip(), replicas_stored)
322             elif status is not None:
323                 _logger.debug("Request fail: PUT %s => %s %s",
324                               self.args['data_hash'], status,
325                               self.service.last_result[1])
326
327
328     def __init__(self, api_client=None, proxy=None, timeout=300,
329                  api_token=None, local_store=None):
330         """Initialize a new KeepClient.
331
332         Arguments:
333         * api_client: The API client to use to find Keep services.  If not
334           provided, KeepClient will build one from available Arvados
335           configuration.
336         * proxy: If specified, this KeepClient will send requests to this
337           Keep proxy.  Otherwise, KeepClient will fall back to the setting
338           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
339           ensure KeepClient does not use a proxy, pass in an empty string.
340         * timeout: The timeout for all HTTP requests, in seconds.  Default
341           300.
342         * api_token: If you're not using an API client, but only talking
343           directly to a Keep proxy, this parameter specifies an API token
344           to authenticate Keep requests.  It is an error to specify both
345           api_client and api_token.  If you specify neither, KeepClient
346           will use one available from the Arvados configuration.
347         * local_store: If specified, this KeepClient will bypass Keep
348           services, and save data to the named directory.  If unspecified,
349           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
350           environment variable.  If you want to ensure KeepClient does not
351           use local storage, pass in an empty string.  This is primarily
352           intended to mock a server for testing.
353         """
354         self.lock = threading.Lock()
355         if proxy is None:
356             proxy = config.get('ARVADOS_KEEP_PROXY')
357         if api_token is None:
358             api_token = config.get('ARVADOS_API_TOKEN')
359         elif api_client is not None:
360             raise ValueError(
361                 "can't build KeepClient with both API client and token")
362         if local_store is None:
363             local_store = os.environ.get('KEEP_LOCAL_STORE')
364
365         if local_store:
366             self.local_store = local_store
367             self.get = self.local_store_get
368             self.put = self.local_store_put
369         else:
370             self.timeout = timeout
371             self.cache_max = 256 * 1024 * 1024  # Cache is 256MiB
372             self._cache = []
373             self._cache_lock = threading.Lock()
374             if proxy:
375                 if not proxy.endswith('/'):
376                     proxy += '/'
377                 self.api_token = api_token
378                 self.service_roots = [proxy]
379                 self.using_proxy = True
380                 self.static_service_roots = True
381             else:
382                 # It's important to avoid instantiating an API client
383                 # unless we actually need one, for testing's sake.
384                 if api_client is None:
385                     api_client = arvados.api('v1')
386                 self.api_client = api_client
387                 self.api_token = api_client.api_token
388                 self.service_roots = None
389                 self.using_proxy = None
390                 self.static_service_roots = False
391
392     def build_service_roots(self, force_rebuild=False):
393         if (self.static_service_roots or
394               (self.service_roots and not force_rebuild)):
395             return
396         with self.lock:
397             try:
398                 keep_services = self.api_client.keep_services().accessible()
399             except Exception:  # API server predates Keep services.
400                 keep_services = self.api_client.keep_disks().list()
401
402             keep_services = keep_services.execute().get('items')
403             if not keep_services:
404                 raise arvados.errors.NoKeepServersError()
405
406             self.using_proxy = (keep_services[0].get('service_type') ==
407                                 'proxy')
408
409             roots = (("http%s://%s:%d/" %
410                       ('s' if f['service_ssl_flag'] else '',
411                        f['service_host'],
412                        f['service_port']))
413                      for f in keep_services)
414             self.service_roots = sorted(set(roots))
415             _logger.debug(str(self.service_roots))
416
417     def shuffled_service_roots(self, hash, force_rebuild=False):
418         self.build_service_roots(force_rebuild)
419
420         # Build an ordering with which to query the Keep servers based on the
421         # contents of the hash.
422         # "hash" is a hex-encoded number at least 8 digits
423         # (32 bits) long
424
425         # seed used to calculate the next keep server from 'pool'
426         # to be added to 'pseq'
427         seed = hash
428
429         # Keep servers still to be added to the ordering
430         pool = self.service_roots[:]
431
432         # output probe sequence
433         pseq = []
434
435         # iterate while there are servers left to be assigned
436         while len(pool) > 0:
437             if len(seed) < 8:
438                 # ran out of digits in the seed
439                 if len(pseq) < len(hash) / 4:
440                     # the number of servers added to the probe sequence is less
441                     # than the number of 4-digit slices in 'hash' so refill the
442                     # seed with the last 4 digits and then append the contents
443                     # of 'hash'.
444                     seed = hash[-4:] + hash
445                 else:
446                     # refill the seed with the contents of 'hash'
447                     seed += hash
448
449             # Take the next 8 digits (32 bytes) and interpret as an integer,
450             # then modulus with the size of the remaining pool to get the next
451             # selected server.
452             probe = int(seed[0:8], 16) % len(pool)
453
454             # Append the selected server to the probe sequence and remove it
455             # from the pool.
456             pseq += [pool[probe]]
457             pool = pool[:probe] + pool[probe+1:]
458
459             # Remove the digits just used from the seed
460             seed = seed[8:]
461         _logger.debug(str(pseq))
462         return pseq
463
464     class CacheSlot(object):
465         def __init__(self, locator):
466             self.locator = locator
467             self.ready = threading.Event()
468             self.content = None
469
470         def get(self):
471             self.ready.wait()
472             return self.content
473
474         def set(self, value):
475             self.content = value
476             self.ready.set()
477
478         def size(self):
479             if self.content == None:
480                 return 0
481             else:
482                 return len(self.content)
483
484     def cap_cache(self):
485         '''Cap the cache size to self.cache_max'''
486         self._cache_lock.acquire()
487         try:
488             self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
489             sm = sum([slot.size() for slot in self._cache])
490             while sm > self.cache_max:
491                 del self._cache[-1]
492                 sm = sum([slot.size() for a in self._cache])
493         finally:
494             self._cache_lock.release()
495
496     def reserve_cache(self, locator):
497         '''Reserve a cache slot for the specified locator,
498         or return the existing slot.'''
499         self._cache_lock.acquire()
500         try:
501             # Test if the locator is already in the cache
502             for i in xrange(0, len(self._cache)):
503                 if self._cache[i].locator == locator:
504                     n = self._cache[i]
505                     if i != 0:
506                         # move it to the front
507                         del self._cache[i]
508                         self._cache.insert(0, n)
509                     return n, False
510
511             # Add a new cache slot for the locator
512             n = KeepClient.CacheSlot(locator)
513             self._cache.insert(0, n)
514             return n, True
515         finally:
516             self._cache_lock.release()
517
518     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
519         # roots_map is a dictionary, mapping Keep service root strings
520         # to KeepService objects.  Poll for Keep services, and add any
521         # new ones to roots_map.  Return the current list of local
522         # root strings.
523         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
524         local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
525         for root in local_roots:
526             if root not in roots_map:
527                 roots_map[root] = self.KeepService(root, **headers)
528         return local_roots
529
530     @staticmethod
531     def _check_loop_result(result):
532         # KeepClient RetryLoops should save results as a 2-tuple: the
533         # actual result of the request, and the number of servers available
534         # to receive the request this round.
535         # This method returns True if there's a real result, False if
536         # there are no more servers available, otherwise None.
537         if isinstance(result, Exception):
538             return None
539         result, tried_server_count = result
540         if (result is not None) and (result is not False):
541             return True
542         elif tried_server_count < 1:
543             _logger.info("No more Keep services to try; giving up")
544             return False
545         else:
546             return None
547
548     def get(self, loc_s, num_retries=0):
549         """Get data from Keep.
550
551         This method fetches one or more blocks of data from Keep.  It
552         sends a request each Keep service registered with the API
553         server (or the proxy provided when this client was
554         instantiated), then each service named in location hints, in
555         sequence.  As soon as one service provides the data, it's
556         returned.
557
558         Arguments:
559         * loc_s: A string of one or more comma-separated locators to fetch.
560           This method returns the concatenation of these blocks.
561         * num_retries: The number of times to retry GET requests to
562           *each* Keep server if it returns temporary failures, with
563           exponential backoff.  Note that, in each loop, the method may try
564           to fetch data from every available Keep service, along with any
565           that are named in location hints in the locator.  Default 0.
566         """
567         if ',' in loc_s:
568             return ''.join(self.get(x) for x in loc_s.split(','))
569         locator = KeepLocator(loc_s)
570         expect_hash = locator.md5sum
571
572         slot, first = self.reserve_cache(expect_hash)
573         if not first:
574             v = slot.get()
575             return v
576
577         # See #3147 for a discussion of the loop implementation.  Highlights:
578         # * Refresh the list of Keep services after each failure, in case
579         #   it's being updated.
580         # * Retry until we succeed, we're out of retries, or every available
581         #   service has returned permanent failure.
582         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
583                       for hint in locator.hints if hint.startswith('K@')]
584         # Map root URLs their KeepService objects.
585         roots_map = {root: self.KeepService(root) for root in hint_roots}
586         blob = None
587         loop = retry.RetryLoop(num_retries, self._check_loop_result,
588                                backoff_start=2)
589         for tries_left in loop:
590             try:
591                 local_roots = self.map_new_services(
592                     roots_map, expect_hash,
593                     force_rebuild=(tries_left < num_retries))
594             except Exception as error:
595                 loop.save_result(error)
596                 continue
597
598             # Query KeepService objects that haven't returned
599             # permanent failure, in our specified shuffle order.
600             services_to_try = [roots_map[root]
601                                for root in (local_roots + hint_roots)
602                                if roots_map[root].usable()]
603             http = httplib2.Http(timeout=self.timeout)
604             for keep_service in services_to_try:
605                 blob = keep_service.get(http, locator)
606                 if blob is not None:
607                     break
608             loop.save_result((blob, len(services_to_try)))
609
610         # Always cache the result, then return it if we succeeded.
611         slot.set(blob)
612         self.cap_cache()
613         if loop.success():
614             return blob
615
616         # No servers fulfilled the request.  Count how many responded
617         # "not found;" if the ratio is high enough (currently 75%), report
618         # Not Found; otherwise a generic error.
619         # Q: Including 403 is necessary for the Keep tests to continue
620         # passing, but maybe they should expect KeepReadError instead?
621         not_founds = sum(1 for ks in roots_map.values()
622                          if ks.last_status() in set([403, 404, 410]))
623         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
624             raise arvados.errors.NotFoundError(loc_s)
625         else:
626             raise arvados.errors.KeepReadError(loc_s)
627
628     def put(self, data, copies=2, num_retries=0):
629         """Save data in Keep.
630
631         This method will get a list of Keep services from the API server, and
632         send the data to each one simultaneously in a new thread.  Once the
633         uploads are finished, if enough copies are saved, this method returns
634         the most recent HTTP response body.  If requests fail to upload
635         enough copies, this method raises KeepWriteError.
636
637         Arguments:
638         * data: The string of data to upload.
639         * copies: The number of copies that the user requires be saved.
640           Default 2.
641         * num_retries: The number of times to retry PUT requests to
642           *each* Keep server if it returns temporary failures, with
643           exponential backoff.  Default 0.
644         """
645         data_hash = hashlib.md5(data).hexdigest()
646         if copies < 1:
647             return data_hash
648
649         headers = {}
650         if self.using_proxy:
651             # Tell the proxy how many copies we want it to store
652             headers['X-Keep-Desired-Replication'] = str(copies)
653         roots_map = {}
654         thread_limiter = KeepClient.ThreadLimiter(copies)
655         loop = retry.RetryLoop(num_retries, self._check_loop_result,
656                                backoff_start=2)
657         for tries_left in loop:
658             try:
659                 local_roots = self.map_new_services(
660                     roots_map, data_hash,
661                     force_rebuild=(tries_left < num_retries), **headers)
662             except Exception as error:
663                 loop.save_result(error)
664                 continue
665
666             threads = []
667             for service_root, ks in roots_map.iteritems():
668                 if ks.finished():
669                     continue
670                 t = KeepClient.KeepWriterThread(
671                     ks,
672                     data=data,
673                     data_hash=data_hash,
674                     service_root=service_root,
675                     thread_limiter=thread_limiter,
676                     timeout=self.timeout)
677                 t.start()
678                 threads.append(t)
679             for t in threads:
680                 t.join()
681             loop.save_result((thread_limiter.done() >= copies, len(threads)))
682
683         if loop.success():
684             return thread_limiter.response()
685         raise arvados.errors.KeepWriteError(
686             "Write fail for %s: wanted %d but wrote %d" %
687             (data_hash, copies, thread_limiter.done()))
688
689     def local_store_put(self, data):
690         md5 = hashlib.md5(data).hexdigest()
691         locator = '%s+%d' % (md5, len(data))
692         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
693             f.write(data)
694         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
695                   os.path.join(self.local_store, md5))
696         return locator
697
698     def local_store_get(self, loc_s):
699         try:
700             locator = KeepLocator(loc_s)
701         except ValueError:
702             raise arvados.errors.NotFoundError(
703                 "Invalid data locator: '%s'" % loc_s)
704         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
705             return ''
706         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
707             return f.read()