3147: Fix variable name typo.
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import timer
21 import datetime
22 import ssl
23 import socket
24
25 _logger = logging.getLogger('arvados.keep')
26 global_client_object = None
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 class KeepLocator(object):
35     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
36     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37
38     def __init__(self, locator_str):
39         self.hints = []
40         self._perm_sig = None
41         self._perm_expiry = None
42         pieces = iter(locator_str.split('+'))
43         self.md5sum = next(pieces)
44         try:
45             self.size = int(next(pieces))
46         except StopIteration:
47             self.size = None
48         for hint in pieces:
49             if self.HINT_RE.match(hint) is None:
50                 raise ValueError("unrecognized hint data {}".format(hint))
51             elif hint.startswith('A'):
52                 self.parse_permission_hint(hint)
53             else:
54                 self.hints.append(hint)
55
56     def __str__(self):
57         return '+'.join(
58             str(s) for s in [self.md5sum, self.size,
59                              self.permission_hint()] + self.hints
60             if s is not None)
61
62     def _make_hex_prop(name, length):
63         # Build and return a new property with the given name that
64         # must be a hex string of the given length.
65         data_name = '_{}'.format(name)
66         def getter(self):
67             return getattr(self, data_name)
68         def setter(self, hex_str):
69             if not arvados.util.is_hex(hex_str, length):
70                 raise ValueError("{} must be a {}-digit hex string: {}".
71                                  format(name, length, hex_str))
72             setattr(self, data_name, hex_str)
73         return property(getter, setter)
74
75     md5sum = _make_hex_prop('md5sum', 32)
76     perm_sig = _make_hex_prop('perm_sig', 40)
77
78     @property
79     def perm_expiry(self):
80         return self._perm_expiry
81
82     @perm_expiry.setter
83     def perm_expiry(self, value):
84         if not arvados.util.is_hex(value, 1, 8):
85             raise ValueError(
86                 "permission timestamp must be a hex Unix timestamp: {}".
87                 format(value))
88         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89
90     def permission_hint(self):
91         data = [self.perm_sig, self.perm_expiry]
92         if None in data:
93             return None
94         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
95         return "A{}@{:08x}".format(*data)
96
97     def parse_permission_hint(self, s):
98         try:
99             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100         except IndexError:
101             raise ValueError("bad permission hint {}".format(s))
102
103     def permission_expired(self, as_of_dt=None):
104         if self.perm_expiry is None:
105             return False
106         elif as_of_dt is None:
107             as_of_dt = datetime.datetime.now()
108         return self.perm_expiry <= as_of_dt
109
110
111 class Keep(object):
112     """Simple interface to a global KeepClient object.
113
114     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
115     own API client.  The global KeepClient will build an API client from the
116     current Arvados configuration, which may not match the one you built.
117     """
118     _last_key = None
119
120     @classmethod
121     def global_client_object(cls):
122         global global_client_object
123         # Previously, KeepClient would change its behavior at runtime based
124         # on these configuration settings.  We simulate that behavior here
125         # by checking the values and returning a new KeepClient if any of
126         # them have changed.
127         key = (config.get('ARVADOS_API_HOST'),
128                config.get('ARVADOS_API_TOKEN'),
129                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
130                config.get('ARVADOS_KEEP_PROXY'),
131                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
132                os.environ.get('KEEP_LOCAL_STORE'))
133         if (global_client_object is None) or (cls._last_key != key):
134             global_client_object = KeepClient()
135             cls._last_key = key
136         return global_client_object
137
138     @staticmethod
139     def get(locator, **kwargs):
140         return Keep.global_client_object().get(locator, **kwargs)
141
142     @staticmethod
143     def put(data, **kwargs):
144         return Keep.global_client_object().put(data, **kwargs)
145
146 class KeepBlockCache(object):
147     # Default RAM cache is 256MiB
148     def __init__(self, cache_max=(256 * 1024 * 1024)):
149         self.cache_max = cache_max
150         self._cache = []
151         self._cache_lock = threading.Lock()
152
153     class CacheSlot(object):
154         def __init__(self, locator):
155             self.locator = locator
156             self.ready = threading.Event()
157             self.content = None
158
159         def get(self):
160             self.ready.wait()
161             return self.content
162
163         def set(self, value):
164             self.content = value
165             self.ready.set()
166
167         def size(self):
168             if self.content == None:
169                 return 0
170             else:
171                 return len(self.content)
172
173     def cap_cache(self):
174         '''Cap the cache size to self.cache_max'''
175         self._cache_lock.acquire()
176         try:
177             # Select all slots except those where ready.is_set() and content is
178             # None (that means there was an error reading the block).
179             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
180             sm = sum([slot.size() for slot in self._cache])
181             while len(self._cache) > 0 and sm > self.cache_max:
182                 for i in xrange(len(self._cache)-1, -1, -1):
183                     if self._cache[i].ready.is_set():
184                         del self._cache[i]
185                         break
186                 sm = sum([slot.size() for slot in self._cache])
187         finally:
188             self._cache_lock.release()
189
190     def reserve_cache(self, locator):
191         '''Reserve a cache slot for the specified locator,
192         or return the existing slot.'''
193         self._cache_lock.acquire()
194         try:
195             # Test if the locator is already in the cache
196             for i in xrange(0, len(self._cache)):
197                 if self._cache[i].locator == locator:
198                     n = self._cache[i]
199                     if i != 0:
200                         # move it to the front
201                         del self._cache[i]
202                         self._cache.insert(0, n)
203                     return n, False
204
205             # Add a new cache slot for the locator
206             n = KeepBlockCache.CacheSlot(locator)
207             self._cache.insert(0, n)
208             return n, True
209         finally:
210             self._cache_lock.release()
211
212 class KeepClient(object):
213     class ThreadLimiter(object):
214         """
215         Limit the number of threads running at a given time to
216         {desired successes} minus {successes reported}. When successes
217         reported == desired, wake up the remaining threads and tell
218         them to quit.
219
220         Should be used in a "with" block.
221         """
222         def __init__(self, todo):
223             self._todo = todo
224             self._done = 0
225             self._response = None
226             self._todo_lock = threading.Semaphore(todo)
227             self._done_lock = threading.Lock()
228
229         def __enter__(self):
230             self._todo_lock.acquire()
231             return self
232
233         def __exit__(self, type, value, traceback):
234             self._todo_lock.release()
235
236         def shall_i_proceed(self):
237             """
238             Return true if the current thread should do stuff. Return
239             false if the current thread should just stop.
240             """
241             with self._done_lock:
242                 return (self._done < self._todo)
243
244         def save_response(self, response_body, replicas_stored):
245             """
246             Records a response body (a locator, possibly signed) returned by
247             the Keep server.  It is not necessary to save more than
248             one response, since we presume that any locator returned
249             in response to a successful request is valid.
250             """
251             with self._done_lock:
252                 self._done += replicas_stored
253                 self._response = response_body
254
255         def response(self):
256             """
257             Returns the body from the response to a PUT request.
258             """
259             with self._done_lock:
260                 return self._response
261
262         def done(self):
263             """
264             Return how many successes were reported.
265             """
266             with self._done_lock:
267                 return self._done
268
269
270     class KeepService(object):
271         # Make requests to a single Keep service, and track results.
272         HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
273                        socket.error, ssl.SSLError)
274
275         def __init__(self, root, **headers):
276             self.root = root
277             self.last_result = None
278             self.success_flag = None
279             self.get_headers = {'Accept': 'application/octet-stream'}
280             self.get_headers.update(headers)
281             self.put_headers = headers
282
283         def usable(self):
284             return self.success_flag is not False
285
286         def finished(self):
287             return self.success_flag is not None
288
289         def last_status(self):
290             try:
291                 return int(self.last_result[0].status)
292             except (AttributeError, IndexError, ValueError):
293                 return None
294
295         def get(self, http, locator):
296             # http is an httplib2.Http object.
297             # locator is a KeepLocator object.
298             url = self.root + str(locator)
299             _logger.debug("Request: GET %s", url)
300             try:
301                 with timer.Timer() as t:
302                     result = http.request(url.encode('utf-8'), 'GET',
303                                           headers=self.get_headers)
304             except self.HTTP_ERRORS as e:
305                 _logger.debug("Request fail: GET %s => %s: %s",
306                               url, type(e), str(e))
307                 self.last_result = e
308             else:
309                 self.last_result = result
310                 self.success_flag = retry.check_http_response_success(result)
311                 content = result[1]
312                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
313                              self.last_status(), len(content), t.msecs,
314                              (len(content)/(1024.0*1024))/t.secs)
315                 if self.success_flag:
316                     resp_md5 = hashlib.md5(content).hexdigest()
317                     if resp_md5 == locator.md5sum:
318                         return content
319                     _logger.warning("Checksum fail: md5(%s) = %s",
320                                     url, resp_md5)
321             return None
322
323         def put(self, http, hash_s, body):
324             url = self.root + hash_s
325             _logger.debug("Request: PUT %s", url)
326             try:
327                 result = http.request(url.encode('utf-8'), 'PUT',
328                                       headers=self.put_headers, body=body)
329             except self.HTTP_ERRORS as e:
330                 _logger.debug("Request fail: PUT %s => %s: %s",
331                               url, type(e), str(e))
332                 self.last_result = e
333             else:
334                 self.last_result = result
335                 self.success_flag = retry.check_http_response_success(result)
336             return self.success_flag
337
338
339     class KeepWriterThread(threading.Thread):
340         """
341         Write a blob of data to the given Keep server. On success, call
342         save_response() of the given ThreadLimiter to save the returned
343         locator.
344         """
345         def __init__(self, keep_service, **kwargs):
346             super(KeepClient.KeepWriterThread, self).__init__()
347             self.service = keep_service
348             self.args = kwargs
349             self._success = False
350
351         def success(self):
352             return self._success
353
354         def run(self):
355             with self.args['thread_limiter'] as limiter:
356                 if not limiter.shall_i_proceed():
357                     # My turn arrived, but the job has been done without
358                     # me.
359                     return
360                 self.run_with_limiter(limiter)
361
362         def run_with_limiter(self, limiter):
363             if self.service.finished():
364                 return
365             _logger.debug("KeepWriterThread %s proceeding %s %s",
366                           str(threading.current_thread()),
367                           self.args['data_hash'],
368                           self.args['service_root'])
369             h = httplib2.Http(timeout=self.args.get('timeout', None))
370             self._success = bool(self.service.put(
371                     h, self.args['data_hash'], self.args['data']))
372             status = self.service.last_status()
373             if self._success:
374                 resp, body = self.service.last_result
375                 _logger.debug("KeepWriterThread %s succeeded %s %s",
376                               str(threading.current_thread()),
377                               self.args['data_hash'],
378                               self.args['service_root'])
379                 # Tick the 'done' counter for the number of replica
380                 # reported stored by the server, for the case that
381                 # we're talking to a proxy or other backend that
382                 # stores to multiple copies for us.
383                 try:
384                     replicas_stored = int(resp['x-keep-replicas-stored'])
385                 except (KeyError, ValueError):
386                     replicas_stored = 1
387                 limiter.save_response(body.strip(), replicas_stored)
388             elif status is not None:
389                 _logger.debug("Request fail: PUT %s => %s %s",
390                               self.args['data_hash'], status,
391                               self.service.last_result[1])
392
393
394     def __init__(self, api_client=None, proxy=None, timeout=300,
395                  api_token=None, local_store=None, block_cache=None):
396         """Initialize a new KeepClient.
397
398         Arguments:
399         * api_client: The API client to use to find Keep services.  If not
400           provided, KeepClient will build one from available Arvados
401           configuration.
402         * proxy: If specified, this KeepClient will send requests to this
403           Keep proxy.  Otherwise, KeepClient will fall back to the setting
404           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
405           ensure KeepClient does not use a proxy, pass in an empty string.
406         * timeout: The timeout for all HTTP requests, in seconds.  Default
407           300.
408         * api_token: If you're not using an API client, but only talking
409           directly to a Keep proxy, this parameter specifies an API token
410           to authenticate Keep requests.  It is an error to specify both
411           api_client and api_token.  If you specify neither, KeepClient
412           will use one available from the Arvados configuration.
413         * local_store: If specified, this KeepClient will bypass Keep
414           services, and save data to the named directory.  If unspecified,
415           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
416           environment variable.  If you want to ensure KeepClient does not
417           use local storage, pass in an empty string.  This is primarily
418           intended to mock a server for testing.
419         """
420         self.lock = threading.Lock()
421         if proxy is None:
422             proxy = config.get('ARVADOS_KEEP_PROXY')
423         if api_token is None:
424             api_token = config.get('ARVADOS_API_TOKEN')
425         elif api_client is not None:
426             raise ValueError(
427                 "can't build KeepClient with both API client and token")
428         if local_store is None:
429             local_store = os.environ.get('KEEP_LOCAL_STORE')
430
431         self.block_cache = block_cache if block_cache else KeepBlockCache()
432
433         if local_store:
434             self.local_store = local_store
435             self.get = self.local_store_get
436             self.put = self.local_store_put
437         else:
438             self.timeout = timeout
439
440             if proxy:
441                 if not proxy.endswith('/'):
442                     proxy += '/'
443                 self.api_token = api_token
444                 self.service_roots = [proxy]
445                 self.using_proxy = True
446                 self.static_service_roots = True
447             else:
448                 # It's important to avoid instantiating an API client
449                 # unless we actually need one, for testing's sake.
450                 if api_client is None:
451                     api_client = arvados.api('v1')
452                 self.api_client = api_client
453                 self.api_token = api_client.api_token
454                 self.service_roots = None
455                 self.using_proxy = None
456                 self.static_service_roots = False
457
458     def build_service_roots(self, force_rebuild=False):
459         if (self.static_service_roots or
460               (self.service_roots and not force_rebuild)):
461             return
462         with self.lock:
463             try:
464                 keep_services = self.api_client.keep_services().accessible()
465             except Exception:  # API server predates Keep services.
466                 keep_services = self.api_client.keep_disks().list()
467
468             keep_services = keep_services.execute().get('items')
469             if not keep_services:
470                 raise arvados.errors.NoKeepServersError()
471
472             self.using_proxy = (keep_services[0].get('service_type') ==
473                                 'proxy')
474
475             roots = (("http%s://%s:%d/" %
476                       ('s' if f['service_ssl_flag'] else '',
477                        f['service_host'],
478                        f['service_port']))
479                      for f in keep_services)
480             self.service_roots = sorted(set(roots))
481             _logger.debug(str(self.service_roots))
482
483     def shuffled_service_roots(self, hash, force_rebuild=False):
484         self.build_service_roots(force_rebuild)
485
486         # Build an ordering with which to query the Keep servers based on the
487         # contents of the hash.
488         # "hash" is a hex-encoded number at least 8 digits
489         # (32 bits) long
490
491         # seed used to calculate the next keep server from 'pool'
492         # to be added to 'pseq'
493         seed = hash
494
495         # Keep servers still to be added to the ordering
496         pool = self.service_roots[:]
497
498         # output probe sequence
499         pseq = []
500
501         # iterate while there are servers left to be assigned
502         while len(pool) > 0:
503             if len(seed) < 8:
504                 # ran out of digits in the seed
505                 if len(pseq) < len(hash) / 4:
506                     # the number of servers added to the probe sequence is less
507                     # than the number of 4-digit slices in 'hash' so refill the
508                     # seed with the last 4 digits and then append the contents
509                     # of 'hash'.
510                     seed = hash[-4:] + hash
511                 else:
512                     # refill the seed with the contents of 'hash'
513                     seed += hash
514
515             # Take the next 8 digits (32 bytes) and interpret as an integer,
516             # then modulus with the size of the remaining pool to get the next
517             # selected server.
518             probe = int(seed[0:8], 16) % len(pool)
519
520             # Append the selected server to the probe sequence and remove it
521             # from the pool.
522             pseq += [pool[probe]]
523             pool = pool[:probe] + pool[probe+1:]
524
525             # Remove the digits just used from the seed
526             seed = seed[8:]
527         _logger.debug(str(pseq))
528         return pseq
529
530
531     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
532         # roots_map is a dictionary, mapping Keep service root strings
533         # to KeepService objects.  Poll for Keep services, and add any
534         # new ones to roots_map.  Return the current list of local
535         # root strings.
536         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
537         local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
538         for root in local_roots:
539             if root not in roots_map:
540                 roots_map[root] = self.KeepService(root, **headers)
541         return local_roots
542
543     @staticmethod
544     def _check_loop_result(result):
545         # KeepClient RetryLoops should save results as a 2-tuple: the
546         # actual result of the request, and the number of servers available
547         # to receive the request this round.
548         # This method returns True if there's a real result, False if
549         # there are no more servers available, otherwise None.
550         if isinstance(result, Exception):
551             return None
552         result, tried_server_count = result
553         if (result is not None) and (result is not False):
554             return True
555         elif tried_server_count < 1:
556             _logger.info("No more Keep services to try; giving up")
557             return False
558         else:
559             return None
560
561     def get(self, loc_s, num_retries=0):
562         """Get data from Keep.
563
564         This method fetches one or more blocks of data from Keep.  It
565         sends a request each Keep service registered with the API
566         server (or the proxy provided when this client was
567         instantiated), then each service named in location hints, in
568         sequence.  As soon as one service provides the data, it's
569         returned.
570
571         Arguments:
572         * loc_s: A string of one or more comma-separated locators to fetch.
573           This method returns the concatenation of these blocks.
574         * num_retries: The number of times to retry GET requests to
575           *each* Keep server if it returns temporary failures, with
576           exponential backoff.  Note that, in each loop, the method may try
577           to fetch data from every available Keep service, along with any
578           that are named in location hints in the locator.  Default 0.
579         """
580         if ',' in loc_s:
581             return ''.join(self.get(x) for x in loc_s.split(','))
582         locator = KeepLocator(loc_s)
583         expect_hash = locator.md5sum
584
585         slot, first = self.block_cache.reserve_cache(expect_hash)
586         if not first:
587             v = slot.get()
588             return v
589
590         # See #3147 for a discussion of the loop implementation.  Highlights:
591         # * Refresh the list of Keep services after each failure, in case
592         #   it's being updated.
593         # * Retry until we succeed, we're out of retries, or every available
594         #   service has returned permanent failure.
595         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
596                       for hint in locator.hints if hint.startswith('K@')]
597         # Map root URLs their KeepService objects.
598         roots_map = {root: self.KeepService(root) for root in hint_roots}
599         blob = None
600         loop = retry.RetryLoop(num_retries, self._check_loop_result,
601                                backoff_start=2)
602         for tries_left in loop:
603             try:
604                 local_roots = self.map_new_services(
605                     roots_map, expect_hash,
606                     force_rebuild=(tries_left < num_retries))
607             except Exception as error:
608                 loop.save_result(error)
609                 continue
610
611             # Query KeepService objects that haven't returned
612             # permanent failure, in our specified shuffle order.
613             services_to_try = [roots_map[root]
614                                for root in (local_roots + hint_roots)
615                                if roots_map[root].usable()]
616             http = httplib2.Http(timeout=self.timeout)
617             for keep_service in services_to_try:
618                 blob = keep_service.get(http, locator)
619                 if blob is not None:
620                     break
621             loop.save_result((blob, len(services_to_try)))
622
623         # Always cache the result, then return it if we succeeded.
624         slot.set(blob)
625         self.block_cache.cap_cache()
626         if loop.success():
627             return blob
628
629         # No servers fulfilled the request.  Count how many responded
630         # "not found;" if the ratio is high enough (currently 75%), report
631         # Not Found; otherwise a generic error.
632         # Q: Including 403 is necessary for the Keep tests to continue
633         # passing, but maybe they should expect KeepReadError instead?
634         not_founds = sum(1 for ks in roots_map.values()
635                          if ks.last_status() in set([403, 404, 410]))
636         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
637             raise arvados.errors.NotFoundError(loc_s)
638         else:
639             raise arvados.errors.KeepReadError(loc_s)
640
641     def put(self, data, copies=2, num_retries=0):
642         """Save data in Keep.
643
644         This method will get a list of Keep services from the API server, and
645         send the data to each one simultaneously in a new thread.  Once the
646         uploads are finished, if enough copies are saved, this method returns
647         the most recent HTTP response body.  If requests fail to upload
648         enough copies, this method raises KeepWriteError.
649
650         Arguments:
651         * data: The string of data to upload.
652         * copies: The number of copies that the user requires be saved.
653           Default 2.
654         * num_retries: The number of times to retry PUT requests to
655           *each* Keep server if it returns temporary failures, with
656           exponential backoff.  Default 0.
657         """
658         data_hash = hashlib.md5(data).hexdigest()
659         if copies < 1:
660             return data_hash
661
662         headers = {}
663         if self.using_proxy:
664             # Tell the proxy how many copies we want it to store
665             headers['X-Keep-Desired-Replication'] = str(copies)
666         roots_map = {}
667         thread_limiter = KeepClient.ThreadLimiter(copies)
668         loop = retry.RetryLoop(num_retries, self._check_loop_result,
669                                backoff_start=2)
670         for tries_left in loop:
671             try:
672                 local_roots = self.map_new_services(
673                     roots_map, data_hash,
674                     force_rebuild=(tries_left < num_retries), **headers)
675             except Exception as error:
676                 loop.save_result(error)
677                 continue
678
679             threads = []
680             for service_root, ks in roots_map.iteritems():
681                 if ks.finished():
682                     continue
683                 t = KeepClient.KeepWriterThread(
684                     ks,
685                     data=data,
686                     data_hash=data_hash,
687                     service_root=service_root,
688                     thread_limiter=thread_limiter,
689                     timeout=self.timeout)
690                 t.start()
691                 threads.append(t)
692             for t in threads:
693                 t.join()
694             loop.save_result((thread_limiter.done() >= copies, len(threads)))
695
696         if loop.success():
697             return thread_limiter.response()
698         raise arvados.errors.KeepWriteError(
699             "Write fail for %s: wanted %d but wrote %d" %
700             (data_hash, copies, thread_limiter.done()))
701
702     def local_store_put(self, data):
703         md5 = hashlib.md5(data).hexdigest()
704         locator = '%s+%d' % (md5, len(data))
705         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
706             f.write(data)
707         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
708                   os.path.join(self.local_store, md5))
709         return locator
710
711     def local_store_get(self, loc_s):
712         try:
713             locator = KeepLocator(loc_s)
714         except ValueError:
715             raise arvados.errors.NotFoundError(
716                 "Invalid data locator: '%s'" % loc_s)
717         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
718             return ''
719         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
720             return f.read()