Merge branch '3411-expire-collections'
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import timer
21 import datetime
22 import ssl
23 import socket
24
25 _logger = logging.getLogger('arvados.keep')
26 global_client_object = None
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 class KeepLocator(object):
35     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
36     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37
38     def __init__(self, locator_str):
39         self.hints = []
40         self._perm_sig = None
41         self._perm_expiry = None
42         pieces = iter(locator_str.split('+'))
43         self.md5sum = next(pieces)
44         try:
45             self.size = int(next(pieces))
46         except StopIteration:
47             self.size = None
48         for hint in pieces:
49             if self.HINT_RE.match(hint) is None:
50                 raise ValueError("unrecognized hint data {}".format(hint))
51             elif hint.startswith('A'):
52                 self.parse_permission_hint(hint)
53             else:
54                 self.hints.append(hint)
55
56     def __str__(self):
57         return '+'.join(
58             str(s) for s in [self.md5sum, self.size,
59                              self.permission_hint()] + self.hints
60             if s is not None)
61
62     def _make_hex_prop(name, length):
63         # Build and return a new property with the given name that
64         # must be a hex string of the given length.
65         data_name = '_{}'.format(name)
66         def getter(self):
67             return getattr(self, data_name)
68         def setter(self, hex_str):
69             if not arvados.util.is_hex(hex_str, length):
70                 raise ValueError("{} must be a {}-digit hex string: {}".
71                                  format(name, length, hex_str))
72             setattr(self, data_name, hex_str)
73         return property(getter, setter)
74
75     md5sum = _make_hex_prop('md5sum', 32)
76     perm_sig = _make_hex_prop('perm_sig', 40)
77
78     @property
79     def perm_expiry(self):
80         return self._perm_expiry
81
82     @perm_expiry.setter
83     def perm_expiry(self, value):
84         if not arvados.util.is_hex(value, 1, 8):
85             raise ValueError(
86                 "permission timestamp must be a hex Unix timestamp: {}".
87                 format(value))
88         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89
90     def permission_hint(self):
91         data = [self.perm_sig, self.perm_expiry]
92         if None in data:
93             return None
94         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
95         return "A{}@{:08x}".format(*data)
96
97     def parse_permission_hint(self, s):
98         try:
99             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100         except IndexError:
101             raise ValueError("bad permission hint {}".format(s))
102
103     def permission_expired(self, as_of_dt=None):
104         if self.perm_expiry is None:
105             return False
106         elif as_of_dt is None:
107             as_of_dt = datetime.datetime.now()
108         return self.perm_expiry <= as_of_dt
109
110
111 class Keep(object):
112     """Simple interface to a global KeepClient object.
113
114     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
115     own API client.  The global KeepClient will build an API client from the
116     current Arvados configuration, which may not match the one you built.
117     """
118     _last_key = None
119
120     @classmethod
121     def global_client_object(cls):
122         global global_client_object
123         # Previously, KeepClient would change its behavior at runtime based
124         # on these configuration settings.  We simulate that behavior here
125         # by checking the values and returning a new KeepClient if any of
126         # them have changed.
127         key = (config.get('ARVADOS_API_HOST'),
128                config.get('ARVADOS_API_TOKEN'),
129                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
130                config.get('ARVADOS_KEEP_PROXY'),
131                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
132                os.environ.get('KEEP_LOCAL_STORE'))
133         if (global_client_object is None) or (cls._last_key != key):
134             global_client_object = KeepClient()
135             cls._last_key = key
136         return global_client_object
137
138     @staticmethod
139     def get(locator, **kwargs):
140         return Keep.global_client_object().get(locator, **kwargs)
141
142     @staticmethod
143     def put(data, **kwargs):
144         return Keep.global_client_object().put(data, **kwargs)
145
146 class KeepBlockCache(object):
147     # Default RAM cache is 256MiB
148     def __init__(self, cache_max=(256 * 1024 * 1024)):
149         self.cache_max = cache_max
150         self._cache = []
151         self._cache_lock = threading.Lock()
152
153     class CacheSlot(object):
154         def __init__(self, locator):
155             self.locator = locator
156             self.ready = threading.Event()
157             self.content = None
158
159         def get(self):
160             self.ready.wait()
161             return self.content
162
163         def set(self, value):
164             self.content = value
165             self.ready.set()
166
167         def size(self):
168             if self.content == None:
169                 return 0
170             else:
171                 return len(self.content)
172
173     def cap_cache(self):
174         '''Cap the cache size to self.cache_max'''
175         self._cache_lock.acquire()
176         try:
177             # Select all slots except those where ready.is_set() and content is
178             # None (that means there was an error reading the block).
179             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
180             sm = sum([slot.size() for slot in self._cache])
181             while len(self._cache) > 0 and sm > self.cache_max:
182                 for i in xrange(len(self._cache)-1, -1, -1):
183                     if self._cache[i].ready.is_set():
184                         del self._cache[i]
185                         break
186                 sm = sum([slot.size() for slot in self._cache])
187         finally:
188             self._cache_lock.release()
189
190     def reserve_cache(self, locator):
191         '''Reserve a cache slot for the specified locator,
192         or return the existing slot.'''
193         self._cache_lock.acquire()
194         try:
195             # Test if the locator is already in the cache
196             for i in xrange(0, len(self._cache)):
197                 if self._cache[i].locator == locator:
198                     n = self._cache[i]
199                     if i != 0:
200                         # move it to the front
201                         del self._cache[i]
202                         self._cache.insert(0, n)
203                     return n, False
204
205             # Add a new cache slot for the locator
206             n = KeepBlockCache.CacheSlot(locator)
207             self._cache.insert(0, n)
208             return n, True
209         finally:
210             self._cache_lock.release()
211
212 class KeepClient(object):
213     class ThreadLimiter(object):
214         """
215         Limit the number of threads running at a given time to
216         {desired successes} minus {successes reported}. When successes
217         reported == desired, wake up the remaining threads and tell
218         them to quit.
219
220         Should be used in a "with" block.
221         """
222         def __init__(self, todo):
223             self._todo = todo
224             self._done = 0
225             self._response = None
226             self._todo_lock = threading.Semaphore(todo)
227             self._done_lock = threading.Lock()
228
229         def __enter__(self):
230             self._todo_lock.acquire()
231             return self
232
233         def __exit__(self, type, value, traceback):
234             self._todo_lock.release()
235
236         def shall_i_proceed(self):
237             """
238             Return true if the current thread should do stuff. Return
239             false if the current thread should just stop.
240             """
241             with self._done_lock:
242                 return (self._done < self._todo)
243
244         def save_response(self, response_body, replicas_stored):
245             """
246             Records a response body (a locator, possibly signed) returned by
247             the Keep server.  It is not necessary to save more than
248             one response, since we presume that any locator returned
249             in response to a successful request is valid.
250             """
251             with self._done_lock:
252                 self._done += replicas_stored
253                 self._response = response_body
254
255         def response(self):
256             """
257             Returns the body from the response to a PUT request.
258             """
259             with self._done_lock:
260                 return self._response
261
262         def done(self):
263             """
264             Return how many successes were reported.
265             """
266             with self._done_lock:
267                 return self._done
268
269
270     class KeepService(object):
271         # Make requests to a single Keep service, and track results.
272         HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
273                        socket.error, ssl.SSLError)
274
275         def __init__(self, root, **headers):
276             self.root = root
277             self.last_result = None
278             self.success_flag = None
279             self.get_headers = {'Accept': 'application/octet-stream'}
280             self.get_headers.update(headers)
281             self.put_headers = headers
282
283         def usable(self):
284             return self.success_flag is not False
285
286         def finished(self):
287             return self.success_flag is not None
288
289         def last_status(self):
290             try:
291                 return int(self.last_result[0].status)
292             except (AttributeError, IndexError, ValueError):
293                 return None
294
295         def get(self, http, locator):
296             # http is an httplib2.Http object.
297             # locator is a KeepLocator object.
298             url = self.root + str(locator)
299             _logger.debug("Request: GET %s", url)
300             try:
301                 with timer.Timer() as t:
302                     result = http.request(url.encode('utf-8'), 'GET',
303                                           headers=self.get_headers)
304             except self.HTTP_ERRORS as e:
305                 _logger.debug("Request fail: GET %s => %s: %s",
306                               url, type(e), str(e))
307                 self.last_result = e
308             else:
309                 self.last_result = result
310                 self.success_flag = retry.check_http_response_success(result)
311                 content = result[1]
312                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
313                              self.last_status(), len(content), t.msecs,
314                              (len(content)/(1024.0*1024))/t.secs)
315                 if self.success_flag:
316                     resp_md5 = hashlib.md5(content).hexdigest()
317                     if resp_md5 == locator.md5sum:
318                         return content
319                     _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
320             return None
321
322         def put(self, http, hash_s, body):
323             url = self.root + hash_s
324             _logger.debug("Request: PUT %s", url)
325             try:
326                 result = http.request(url.encode('utf-8'), 'PUT',
327                                       headers=self.put_headers, body=body)
328             except self.HTTP_ERRORS as e:
329                 _logger.debug("Request fail: PUT %s => %s: %s",
330                               url, type(e), str(e))
331                 self.last_result = e
332             else:
333                 self.last_result = result
334                 self.success_flag = retry.check_http_response_success(result)
335             return self.success_flag
336
337
338     class KeepWriterThread(threading.Thread):
339         """
340         Write a blob of data to the given Keep server. On success, call
341         save_response() of the given ThreadLimiter to save the returned
342         locator.
343         """
344         def __init__(self, keep_service, **kwargs):
345             super(KeepClient.KeepWriterThread, self).__init__()
346             self.service = keep_service
347             self.args = kwargs
348             self._success = False
349
350         def success(self):
351             return self._success
352
353         def run(self):
354             with self.args['thread_limiter'] as limiter:
355                 if not limiter.shall_i_proceed():
356                     # My turn arrived, but the job has been done without
357                     # me.
358                     return
359                 self.run_with_limiter(limiter)
360
361         def run_with_limiter(self, limiter):
362             if self.service.finished():
363                 return
364             _logger.debug("KeepWriterThread %s proceeding %s %s",
365                           str(threading.current_thread()),
366                           self.args['data_hash'],
367                           self.args['service_root'])
368             h = httplib2.Http(timeout=self.args.get('timeout', None))
369             self._success = bool(self.service.put(
370                     h, self.args['data_hash'], self.args['data']))
371             status = self.service.last_status()
372             if self._success:
373                 resp, body = self.service.last_result
374                 _logger.debug("KeepWriterThread %s succeeded %s %s",
375                               str(threading.current_thread()),
376                               self.args['data_hash'],
377                               self.args['service_root'])
378                 # Tick the 'done' counter for the number of replica
379                 # reported stored by the server, for the case that
380                 # we're talking to a proxy or other backend that
381                 # stores to multiple copies for us.
382                 try:
383                     replicas_stored = int(resp['x-keep-replicas-stored'])
384                 except (KeyError, ValueError):
385                     replicas_stored = 1
386                 limiter.save_response(body.strip(), replicas_stored)
387             elif status is not None:
388                 _logger.debug("Request fail: PUT %s => %s %s",
389                               self.args['data_hash'], status,
390                               self.service.last_result[1])
391
392
393     def __init__(self, api_client=None, proxy=None, timeout=300,
394                  api_token=None, local_store=None, block_cache=None):
395         """Initialize a new KeepClient.
396
397         Arguments:
398         * api_client: The API client to use to find Keep services.  If not
399           provided, KeepClient will build one from available Arvados
400           configuration.
401         * proxy: If specified, this KeepClient will send requests to this
402           Keep proxy.  Otherwise, KeepClient will fall back to the setting
403           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
404           ensure KeepClient does not use a proxy, pass in an empty string.
405         * timeout: The timeout for all HTTP requests, in seconds.  Default
406           300.
407         * api_token: If you're not using an API client, but only talking
408           directly to a Keep proxy, this parameter specifies an API token
409           to authenticate Keep requests.  It is an error to specify both
410           api_client and api_token.  If you specify neither, KeepClient
411           will use one available from the Arvados configuration.
412         * local_store: If specified, this KeepClient will bypass Keep
413           services, and save data to the named directory.  If unspecified,
414           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
415           environment variable.  If you want to ensure KeepClient does not
416           use local storage, pass in an empty string.  This is primarily
417           intended to mock a server for testing.
418         """
419         self.lock = threading.Lock()
420         if proxy is None:
421             proxy = config.get('ARVADOS_KEEP_PROXY')
422         if api_token is None:
423             api_token = config.get('ARVADOS_API_TOKEN')
424         elif api_client is not None:
425             raise ValueError(
426                 "can't build KeepClient with both API client and token")
427         if local_store is None:
428             local_store = os.environ.get('KEEP_LOCAL_STORE')
429
430         self.block_cache = block_cache if block_cache else KeepBlockCache()
431
432         if local_store:
433             self.local_store = local_store
434             self.get = self.local_store_get
435             self.put = self.local_store_put
436         else:
437             self.timeout = timeout
438
439             if proxy:
440                 if not proxy.endswith('/'):
441                     proxy += '/'
442                 self.api_token = api_token
443                 self.service_roots = [proxy]
444                 self.using_proxy = True
445                 self.static_service_roots = True
446             else:
447                 # It's important to avoid instantiating an API client
448                 # unless we actually need one, for testing's sake.
449                 if api_client is None:
450                     api_client = arvados.api('v1')
451                 self.api_client = api_client
452                 self.api_token = api_client.api_token
453                 self.service_roots = None
454                 self.using_proxy = None
455                 self.static_service_roots = False
456
457     def build_service_roots(self, force_rebuild=False):
458         if (self.static_service_roots or
459               (self.service_roots and not force_rebuild)):
460             return
461         with self.lock:
462             try:
463                 keep_services = self.api_client.keep_services().accessible()
464             except Exception:  # API server predates Keep services.
465                 keep_services = self.api_client.keep_disks().list()
466
467             keep_services = keep_services.execute().get('items')
468             if not keep_services:
469                 raise arvados.errors.NoKeepServersError()
470
471             self.using_proxy = (keep_services[0].get('service_type') ==
472                                 'proxy')
473
474             roots = (("http%s://%s:%d/" %
475                       ('s' if f['service_ssl_flag'] else '',
476                        f['service_host'],
477                        f['service_port']))
478                      for f in keep_services)
479             self.service_roots = sorted(set(roots))
480             _logger.debug(str(self.service_roots))
481
482     def shuffled_service_roots(self, hash, force_rebuild=False):
483         self.build_service_roots(force_rebuild)
484
485         # Build an ordering with which to query the Keep servers based on the
486         # contents of the hash.
487         # "hash" is a hex-encoded number at least 8 digits
488         # (32 bits) long
489
490         # seed used to calculate the next keep server from 'pool'
491         # to be added to 'pseq'
492         seed = hash
493
494         # Keep servers still to be added to the ordering
495         pool = self.service_roots[:]
496
497         # output probe sequence
498         pseq = []
499
500         # iterate while there are servers left to be assigned
501         while len(pool) > 0:
502             if len(seed) < 8:
503                 # ran out of digits in the seed
504                 if len(pseq) < len(hash) / 4:
505                     # the number of servers added to the probe sequence is less
506                     # than the number of 4-digit slices in 'hash' so refill the
507                     # seed with the last 4 digits and then append the contents
508                     # of 'hash'.
509                     seed = hash[-4:] + hash
510                 else:
511                     # refill the seed with the contents of 'hash'
512                     seed += hash
513
514             # Take the next 8 digits (32 bytes) and interpret as an integer,
515             # then modulus with the size of the remaining pool to get the next
516             # selected server.
517             probe = int(seed[0:8], 16) % len(pool)
518
519             # Append the selected server to the probe sequence and remove it
520             # from the pool.
521             pseq += [pool[probe]]
522             pool = pool[:probe] + pool[probe+1:]
523
524             # Remove the digits just used from the seed
525             seed = seed[8:]
526         _logger.debug(str(pseq))
527         return pseq
528
529
530     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
531         # roots_map is a dictionary, mapping Keep service root strings
532         # to KeepService objects.  Poll for Keep services, and add any
533         # new ones to roots_map.  Return the current list of local
534         # root strings.
535         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
536         local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
537         for root in local_roots:
538             if root not in roots_map:
539                 roots_map[root] = self.KeepService(root, **headers)
540         return local_roots
541
542     @staticmethod
543     def _check_loop_result(result):
544         # KeepClient RetryLoops should save results as a 2-tuple: the
545         # actual result of the request, and the number of servers available
546         # to receive the request this round.
547         # This method returns True if there's a real result, False if
548         # there are no more servers available, otherwise None.
549         if isinstance(result, Exception):
550             return None
551         result, tried_server_count = result
552         if (result is not None) and (result is not False):
553             return True
554         elif tried_server_count < 1:
555             _logger.info("No more Keep services to try; giving up")
556             return False
557         else:
558             return None
559
560     def get(self, loc_s, num_retries=0):
561         """Get data from Keep.
562
563         This method fetches one or more blocks of data from Keep.  It
564         sends a request each Keep service registered with the API
565         server (or the proxy provided when this client was
566         instantiated), then each service named in location hints, in
567         sequence.  As soon as one service provides the data, it's
568         returned.
569
570         Arguments:
571         * loc_s: A string of one or more comma-separated locators to fetch.
572           This method returns the concatenation of these blocks.
573         * num_retries: The number of times to retry GET requests to
574           *each* Keep server if it returns temporary failures, with
575           exponential backoff.  Note that, in each loop, the method may try
576           to fetch data from every available Keep service, along with any
577           that are named in location hints in the locator.  Default 0.
578         """
579         if ',' in loc_s:
580             return ''.join(self.get(x) for x in loc_s.split(','))
581         locator = KeepLocator(loc_s)
582         expect_hash = locator.md5sum
583
584         slot, first = self.block_cache.reserve_cache(expect_hash)
585         if not first:
586             v = slot.get()
587             return v
588
589         # See #3147 for a discussion of the loop implementation.  Highlights:
590         # * Refresh the list of Keep services after each failure, in case
591         #   it's being updated.
592         # * Retry until we succeed, we're out of retries, or every available
593         #   service has returned permanent failure.
594         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
595                       for hint in locator.hints if hint.startswith('K@')]
596         # Map root URLs their KeepService objects.
597         roots_map = {root: self.KeepService(root) for root in hint_roots}
598         blob = None
599         loop = retry.RetryLoop(num_retries, self._check_loop_result,
600                                backoff_start=2)
601         for tries_left in loop:
602             try:
603                 local_roots = self.map_new_services(
604                     roots_map, expect_hash,
605                     force_rebuild=(tries_left < num_retries))
606             except Exception as error:
607                 loop.save_result(error)
608                 continue
609
610             # Query KeepService objects that haven't returned
611             # permanent failure, in our specified shuffle order.
612             services_to_try = [roots_map[root]
613                                for root in (local_roots + hint_roots)
614                                if roots_map[root].usable()]
615             http = httplib2.Http(timeout=self.timeout)
616             for keep_service in services_to_try:
617                 blob = keep_service.get(http, locator)
618                 if blob is not None:
619                     break
620             loop.save_result((blob, len(services_to_try)))
621
622         # Always cache the result, then return it if we succeeded.
623         slot.set(blob)
624         self.block_cache.cap_cache()
625         if loop.success():
626             return blob
627
628         # No servers fulfilled the request.  Count how many responded
629         # "not found;" if the ratio is high enough (currently 75%), report
630         # Not Found; otherwise a generic error.
631         # Q: Including 403 is necessary for the Keep tests to continue
632         # passing, but maybe they should expect KeepReadError instead?
633         not_founds = sum(1 for ks in roots_map.values()
634                          if ks.last_status() in set([403, 404, 410]))
635         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
636             raise arvados.errors.NotFoundError(loc_s)
637         else:
638             raise arvados.errors.KeepReadError(loc_s)
639
640     def put(self, data, copies=2, num_retries=0):
641         """Save data in Keep.
642
643         This method will get a list of Keep services from the API server, and
644         send the data to each one simultaneously in a new thread.  Once the
645         uploads are finished, if enough copies are saved, this method returns
646         the most recent HTTP response body.  If requests fail to upload
647         enough copies, this method raises KeepWriteError.
648
649         Arguments:
650         * data: The string of data to upload.
651         * copies: The number of copies that the user requires be saved.
652           Default 2.
653         * num_retries: The number of times to retry PUT requests to
654           *each* Keep server if it returns temporary failures, with
655           exponential backoff.  Default 0.
656         """
657         data_hash = hashlib.md5(data).hexdigest()
658         if copies < 1:
659             return data_hash
660
661         headers = {}
662         if self.using_proxy:
663             # Tell the proxy how many copies we want it to store
664             headers['X-Keep-Desired-Replication'] = str(copies)
665         roots_map = {}
666         thread_limiter = KeepClient.ThreadLimiter(copies)
667         loop = retry.RetryLoop(num_retries, self._check_loop_result,
668                                backoff_start=2)
669         for tries_left in loop:
670             try:
671                 local_roots = self.map_new_services(
672                     roots_map, data_hash,
673                     force_rebuild=(tries_left < num_retries), **headers)
674             except Exception as error:
675                 loop.save_result(error)
676                 continue
677
678             threads = []
679             for service_root, ks in roots_map.iteritems():
680                 if ks.finished():
681                     continue
682                 t = KeepClient.KeepWriterThread(
683                     ks,
684                     data=data,
685                     data_hash=data_hash,
686                     service_root=service_root,
687                     thread_limiter=thread_limiter,
688                     timeout=self.timeout)
689                 t.start()
690                 threads.append(t)
691             for t in threads:
692                 t.join()
693             loop.save_result((thread_limiter.done() >= copies, len(threads)))
694
695         if loop.success():
696             return thread_limiter.response()
697         raise arvados.errors.KeepWriteError(
698             "Write fail for %s: wanted %d but wrote %d" %
699             (data_hash, copies, thread_limiter.done()))
700
701     def local_store_put(self, data):
702         md5 = hashlib.md5(data).hexdigest()
703         locator = '%s+%d' % (md5, len(data))
704         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
705             f.write(data)
706         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
707                   os.path.join(self.local_store, md5))
708         return locator
709
710     def local_store_get(self, loc_s):
711         try:
712             locator = KeepLocator(loc_s)
713         except ValueError:
714             raise arvados.errors.NotFoundError(
715                 "Invalid data locator: '%s'" % loc_s)
716         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
717             return ''
718         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
719             return f.read()