3147: Remove old Keep signing support from Python SDK.
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import timer
21 import datetime
22 import ssl
23
24 _logger = logging.getLogger('arvados.keep')
25 global_client_object = None
26
27 import arvados
28 import arvados.config as config
29 import arvados.errors
30 import arvados.util
31
32 class KeepLocator(object):
33     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
34     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
35
36     def __init__(self, locator_str):
37         self.hints = []
38         self._perm_sig = None
39         self._perm_expiry = None
40         pieces = iter(locator_str.split('+'))
41         self.md5sum = next(pieces)
42         try:
43             self.size = int(next(pieces))
44         except StopIteration:
45             self.size = None
46         for hint in pieces:
47             if self.HINT_RE.match(hint) is None:
48                 raise ValueError("unrecognized hint data {}".format(hint))
49             elif hint.startswith('A'):
50                 self.parse_permission_hint(hint)
51             else:
52                 self.hints.append(hint)
53
54     def __str__(self):
55         return '+'.join(
56             str(s) for s in [self.md5sum, self.size,
57                              self.permission_hint()] + self.hints
58             if s is not None)
59
60     def _make_hex_prop(name, length):
61         # Build and return a new property with the given name that
62         # must be a hex string of the given length.
63         data_name = '_{}'.format(name)
64         def getter(self):
65             return getattr(self, data_name)
66         def setter(self, hex_str):
67             if not arvados.util.is_hex(hex_str, length):
68                 raise ValueError("{} must be a {}-digit hex string: {}".
69                                  format(name, length, hex_str))
70             setattr(self, data_name, hex_str)
71         return property(getter, setter)
72
73     md5sum = _make_hex_prop('md5sum', 32)
74     perm_sig = _make_hex_prop('perm_sig', 40)
75
76     @property
77     def perm_expiry(self):
78         return self._perm_expiry
79
80     @perm_expiry.setter
81     def perm_expiry(self, value):
82         if not arvados.util.is_hex(value, 1, 8):
83             raise ValueError(
84                 "permission timestamp must be a hex Unix timestamp: {}".
85                 format(value))
86         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
87
88     def permission_hint(self):
89         data = [self.perm_sig, self.perm_expiry]
90         if None in data:
91             return None
92         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
93         return "A{}@{:08x}".format(*data)
94
95     def parse_permission_hint(self, s):
96         try:
97             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
98         except IndexError:
99             raise ValueError("bad permission hint {}".format(s))
100
101     def permission_expired(self, as_of_dt=None):
102         if self.perm_expiry is None:
103             return False
104         elif as_of_dt is None:
105             as_of_dt = datetime.datetime.now()
106         return self.perm_expiry <= as_of_dt
107
108
109 class Keep(object):
110     """Simple interface to a global KeepClient object.
111
112     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
113     own API client.  The global KeepClient will build an API client from the
114     current Arvados configuration, which may not match the one you built.
115     """
116     _last_key = None
117
118     @classmethod
119     def global_client_object(cls):
120         global global_client_object
121         # Previously, KeepClient would change its behavior at runtime based
122         # on these configuration settings.  We simulate that behavior here
123         # by checking the values and returning a new KeepClient if any of
124         # them have changed.
125         key = (config.get('ARVADOS_API_HOST'),
126                config.get('ARVADOS_API_TOKEN'),
127                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
128                config.get('ARVADOS_KEEP_PROXY'),
129                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
130                os.environ.get('KEEP_LOCAL_STORE'))
131         if (global_client_object is None) or (cls._last_key != key):
132             global_client_object = KeepClient()
133             cls._last_key = key
134         return global_client_object
135
136     @staticmethod
137     def get(locator, **kwargs):
138         return Keep.global_client_object().get(locator, **kwargs)
139
140     @staticmethod
141     def put(data, **kwargs):
142         return Keep.global_client_object().put(data, **kwargs)
143
144
145 class KeepClient(object):
146     class ThreadLimiter(object):
147         """
148         Limit the number of threads running at a given time to
149         {desired successes} minus {successes reported}. When successes
150         reported == desired, wake up the remaining threads and tell
151         them to quit.
152
153         Should be used in a "with" block.
154         """
155         def __init__(self, todo):
156             self._todo = todo
157             self._done = 0
158             self._response = None
159             self._todo_lock = threading.Semaphore(todo)
160             self._done_lock = threading.Lock()
161
162         def __enter__(self):
163             self._todo_lock.acquire()
164             return self
165
166         def __exit__(self, type, value, traceback):
167             self._todo_lock.release()
168
169         def shall_i_proceed(self):
170             """
171             Return true if the current thread should do stuff. Return
172             false if the current thread should just stop.
173             """
174             with self._done_lock:
175                 return (self._done < self._todo)
176
177         def save_response(self, response_body, replicas_stored):
178             """
179             Records a response body (a locator, possibly signed) returned by
180             the Keep server.  It is not necessary to save more than
181             one response, since we presume that any locator returned
182             in response to a successful request is valid.
183             """
184             with self._done_lock:
185                 self._done += replicas_stored
186                 self._response = response_body
187
188         def response(self):
189             """
190             Returns the body from the response to a PUT request.
191             """
192             with self._done_lock:
193                 return self._response
194
195         def done(self):
196             """
197             Return how many successes were reported.
198             """
199             with self._done_lock:
200                 return self._done
201
202
203     class KeepWriterThread(threading.Thread):
204         """
205         Write a blob of data to the given Keep server. On success, call
206         save_response() of the given ThreadLimiter to save the returned
207         locator.
208         """
209         def __init__(self, api_token, **kwargs):
210             super(KeepClient.KeepWriterThread, self).__init__()
211             self._api_token = api_token
212             self.args = kwargs
213             self._success = False
214
215         def success(self):
216             return self._success
217
218         def run(self):
219             with self.args['thread_limiter'] as limiter:
220                 if not limiter.shall_i_proceed():
221                     # My turn arrived, but the job has been done without
222                     # me.
223                     return
224                 self.run_with_limiter(limiter)
225
226         def run_with_limiter(self, limiter):
227             _logger.debug("KeepWriterThread %s proceeding %s %s",
228                           str(threading.current_thread()),
229                           self.args['data_hash'],
230                           self.args['service_root'])
231             h = httplib2.Http(timeout=self.args.get('timeout', None))
232             url = self.args['service_root'] + self.args['data_hash']
233             headers = {'Authorization': "OAuth2 %s" % (self._api_token,)}
234
235             if self.args['using_proxy']:
236                 # We're using a proxy, so tell the proxy how many copies we
237                 # want it to store
238                 headers['X-Keep-Desired-Replication'] = str(self.args['want_copies'])
239
240             try:
241                 _logger.debug("Uploading to {}".format(url))
242                 resp, content = h.request(url.encode('utf-8'), 'PUT',
243                                           headers=headers,
244                                           body=self.args['data'])
245                 if re.match(r'^2\d\d$', resp['status']):
246                     self._success = True
247                     _logger.debug("KeepWriterThread %s succeeded %s %s",
248                                   str(threading.current_thread()),
249                                   self.args['data_hash'],
250                                   self.args['service_root'])
251                     replicas_stored = 1
252                     if 'x-keep-replicas-stored' in resp:
253                         # Tick the 'done' counter for the number of replica
254                         # reported stored by the server, for the case that
255                         # we're talking to a proxy or other backend that
256                         # stores to multiple copies for us.
257                         try:
258                             replicas_stored = int(resp['x-keep-replicas-stored'])
259                         except ValueError:
260                             pass
261                     limiter.save_response(content.strip(), replicas_stored)
262                 else:
263                     _logger.debug("Request fail: PUT %s => %s %s",
264                                     url, resp['status'], content)
265             except (httplib2.HttpLib2Error,
266                     httplib.HTTPException,
267                     ssl.SSLError) as e:
268                 # When using https, timeouts look like ssl.SSLError from here.
269                 # "SSLError: The write operation timed out"
270                 _logger.debug("Request fail: PUT %s => %s: %s",
271                                 url, type(e), str(e))
272
273
274     def __init__(self, api_client=None, proxy=None, timeout=60,
275                  api_token=None, local_store=None):
276         """Initialize a new KeepClient.
277
278         Arguments:
279         * api_client: The API client to use to find Keep services.  If not
280           provided, KeepClient will build one from available Arvados
281           configuration.
282         * proxy: If specified, this KeepClient will send requests to this
283           Keep proxy.  Otherwise, KeepClient will fall back to the setting
284           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
285           ensure KeepClient does not use a proxy, pass in an empty string.
286         * timeout: The timeout for all HTTP requests, in seconds.  Default
287           60.
288         * api_token: If you're not using an API client, but only talking
289           directly to a Keep proxy, this parameter specifies an API token
290           to authenticate Keep requests.  It is an error to specify both
291           api_client and api_token.  If you specify neither, KeepClient
292           will use one available from the Arvados configuration.
293         * local_store: If specified, this KeepClient will bypass Keep
294           services, and save data to the named directory.  If unspecified,
295           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
296           environment variable.  If you want to ensure KeepClient does not
297           use local storage, pass in an empty string.  This is primarily
298           intended to mock a server for testing.
299         """
300         self.lock = threading.Lock()
301         if proxy is None:
302             proxy = config.get('ARVADOS_KEEP_PROXY')
303         if api_token is None:
304             api_token = config.get('ARVADOS_API_TOKEN')
305         elif api_client is not None:
306             raise ValueError(
307                 "can't build KeepClient with both API client and token")
308         if local_store is None:
309             local_store = os.environ.get('KEEP_LOCAL_STORE')
310
311         if local_store:
312             self.local_store = local_store
313             self.get = self.local_store_get
314             self.put = self.local_store_put
315         else:
316             self.timeout = timeout
317             self.cache_max = 256 * 1024 * 1024  # Cache is 256MiB
318             self._cache = []
319             self._cache_lock = threading.Lock()
320             if proxy:
321                 if not proxy.endswith('/'):
322                     proxy += '/'
323                 self.api_token = api_token
324                 self.service_roots = [proxy]
325                 self.using_proxy = True
326             else:
327                 # It's important to avoid instantiating an API client
328                 # unless we actually need one, for testing's sake.
329                 if api_client is None:
330                     api_client = arvados.api('v1')
331                 self.api_client = api_client
332                 self.api_token = api_client.api_token
333                 self.service_roots = None
334                 self.using_proxy = None
335
336     def shuffled_service_roots(self, hash):
337         if self.service_roots is None:
338             with self.lock:
339                 try:
340                     keep_services = self.api_client.keep_services().accessible()
341                 except Exception:  # API server predates Keep services.
342                     keep_services = self.api_client.keep_disks().list()
343
344                 keep_services = keep_services.execute().get('items')
345                 if not keep_services:
346                     raise arvados.errors.NoKeepServersError()
347
348                 self.using_proxy = (keep_services[0].get('service_type') ==
349                                     'proxy')
350
351                 roots = (("http%s://%s:%d/" %
352                           ('s' if f['service_ssl_flag'] else '',
353                            f['service_host'],
354                            f['service_port']))
355                          for f in keep_services)
356                 self.service_roots = sorted(set(roots))
357                 _logger.debug(str(self.service_roots))
358
359         # Build an ordering with which to query the Keep servers based on the
360         # contents of the hash.
361         # "hash" is a hex-encoded number at least 8 digits
362         # (32 bits) long
363
364         # seed used to calculate the next keep server from 'pool'
365         # to be added to 'pseq'
366         seed = hash
367
368         # Keep servers still to be added to the ordering
369         pool = self.service_roots[:]
370
371         # output probe sequence
372         pseq = []
373
374         # iterate while there are servers left to be assigned
375         while len(pool) > 0:
376             if len(seed) < 8:
377                 # ran out of digits in the seed
378                 if len(pseq) < len(hash) / 4:
379                     # the number of servers added to the probe sequence is less
380                     # than the number of 4-digit slices in 'hash' so refill the
381                     # seed with the last 4 digits and then append the contents
382                     # of 'hash'.
383                     seed = hash[-4:] + hash
384                 else:
385                     # refill the seed with the contents of 'hash'
386                     seed += hash
387
388             # Take the next 8 digits (32 bytes) and interpret as an integer,
389             # then modulus with the size of the remaining pool to get the next
390             # selected server.
391             probe = int(seed[0:8], 16) % len(pool)
392
393             # Append the selected server to the probe sequence and remove it
394             # from the pool.
395             pseq += [pool[probe]]
396             pool = pool[:probe] + pool[probe+1:]
397
398             # Remove the digits just used from the seed
399             seed = seed[8:]
400         _logger.debug(str(pseq))
401         return pseq
402
403     class CacheSlot(object):
404         def __init__(self, locator):
405             self.locator = locator
406             self.ready = threading.Event()
407             self.content = None
408
409         def get(self):
410             self.ready.wait()
411             return self.content
412
413         def set(self, value):
414             self.content = value
415             self.ready.set()
416
417         def size(self):
418             if self.content == None:
419                 return 0
420             else:
421                 return len(self.content)
422
423     def cap_cache(self):
424         '''Cap the cache size to self.cache_max'''
425         self._cache_lock.acquire()
426         try:
427             self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
428             sm = sum([slot.size() for slot in self._cache])
429             while sm > self.cache_max:
430                 del self._cache[-1]
431                 sm = sum([slot.size() for a in self._cache])
432         finally:
433             self._cache_lock.release()
434
435     def reserve_cache(self, locator):
436         '''Reserve a cache slot for the specified locator,
437         or return the existing slot.'''
438         self._cache_lock.acquire()
439         try:
440             # Test if the locator is already in the cache
441             for i in xrange(0, len(self._cache)):
442                 if self._cache[i].locator == locator:
443                     n = self._cache[i]
444                     if i != 0:
445                         # move it to the front
446                         del self._cache[i]
447                         self._cache.insert(0, n)
448                     return n, False
449
450             # Add a new cache slot for the locator
451             n = KeepClient.CacheSlot(locator)
452             self._cache.insert(0, n)
453             return n, True
454         finally:
455             self._cache_lock.release()
456
457     def get(self, loc_s):
458         if ',' in loc_s:
459             return ''.join(self.get(x) for x in loc_s.split(','))
460         locator = KeepLocator(loc_s)
461         expect_hash = locator.md5sum
462
463         slot, first = self.reserve_cache(expect_hash)
464
465         if not first:
466             v = slot.get()
467             return v
468
469         try:
470             for service_root in self.shuffled_service_roots(expect_hash):
471                 url = service_root + loc_s
472                 headers = {'Authorization': "OAuth2 %s" % (self.api_token,),
473                            'Accept': 'application/octet-stream'}
474                 blob = self.get_url(url, headers, expect_hash)
475                 if blob:
476                     slot.set(blob)
477                     self.cap_cache()
478                     return blob
479
480             for hint in locator.hints:
481                 if not hint.startswith('K@'):
482                     continue
483                 url = 'http://keep.' + hint[2:] + '.arvadosapi.com/' + loc_s
484                 blob = self.get_url(url, {}, expect_hash)
485                 if blob:
486                     slot.set(blob)
487                     self.cap_cache()
488                     return blob
489         except:
490             slot.set(None)
491             self.cap_cache()
492             raise
493
494         slot.set(None)
495         self.cap_cache()
496         raise arvados.errors.NotFoundError("Block not found: %s" % expect_hash)
497
498     def get_url(self, url, headers, expect_hash):
499         h = httplib2.Http()
500         try:
501             _logger.debug("Request: GET %s", url)
502             with timer.Timer() as t:
503                 resp, content = h.request(url.encode('utf-8'), 'GET',
504                                           headers=headers)
505             _logger.info("Received %s bytes in %s msec (%s MiB/sec)",
506                          len(content), t.msecs,
507                          (len(content)/(1024*1024))/t.secs)
508             if re.match(r'^2\d\d$', resp['status']):
509                 md5 = hashlib.md5(content).hexdigest()
510                 if md5 == expect_hash:
511                     return content
512                 _logger.warning("Checksum fail: md5(%s) = %s", url, md5)
513         except Exception as e:
514             _logger.debug("Request fail: GET %s => %s: %s",
515                          url, type(e), str(e))
516         return None
517
518     def put(self, data, copies=2):
519         data_hash = hashlib.md5(data).hexdigest()
520         have_copies = 0
521         want_copies = copies
522         if not (want_copies > 0):
523             return data_hash
524         threads = []
525         thread_limiter = KeepClient.ThreadLimiter(want_copies)
526         for service_root in self.shuffled_service_roots(data_hash):
527             t = KeepClient.KeepWriterThread(
528                 self.api_token,
529                 data=data,
530                 data_hash=data_hash,
531                 service_root=service_root,
532                 thread_limiter=thread_limiter,
533                 timeout=self.timeout,
534                 using_proxy=self.using_proxy,
535                 want_copies=(want_copies if self.using_proxy else 1))
536             t.start()
537             threads += [t]
538         for t in threads:
539             t.join()
540         if thread_limiter.done() < want_copies:
541             # Retry the threads (i.e., services) that failed the first
542             # time around.
543             threads_retry = []
544             for t in threads:
545                 if not t.success():
546                     _logger.debug("Retrying: PUT %s %s",
547                                     t.args['service_root'],
548                                     t.args['data_hash'])
549                     retry_with_args = t.args.copy()
550                     t_retry = KeepClient.KeepWriterThread(self.api_token,
551                                                           **retry_with_args)
552                     t_retry.start()
553                     threads_retry += [t_retry]
554             for t in threads_retry:
555                 t.join()
556         have_copies = thread_limiter.done()
557         # If we're done, return the response from Keep
558         if have_copies >= want_copies:
559             return thread_limiter.response()
560         raise arvados.errors.KeepWriteError(
561             "Write fail for %s: wanted %d but wrote %d" %
562             (data_hash, want_copies, have_copies))
563
564
565     def local_store_put(self, data):
566         md5 = hashlib.md5(data).hexdigest()
567         locator = '%s+%d' % (md5, len(data))
568         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
569             f.write(data)
570         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
571                   os.path.join(self.local_store, md5))
572         return locator
573
574     def local_store_get(self, loc_s):
575         try:
576             locator = KeepLocator(loc_s)
577         except ValueError:
578             raise arvados.errors.NotFoundError(
579                 "Invalid data locator: '%s'" % loc_s)
580         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
581             return ''
582         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
583             return f.read()