Merge branch '3137-arv-mount-stats' closes #3137
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import threading
13 import timer
14
15 import arvados
16 import arvados.config as config
17 import arvados.errors
18 import arvados.retry as retry
19 import arvados.util
20
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
23
24
25 class KeepLocator(object):
26     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28
29     def __init__(self, locator_str):
30         self.hints = []
31         self._perm_sig = None
32         self._perm_expiry = None
33         pieces = iter(locator_str.split('+'))
34         self.md5sum = next(pieces)
35         try:
36             self.size = int(next(pieces))
37         except StopIteration:
38             self.size = None
39         for hint in pieces:
40             if self.HINT_RE.match(hint) is None:
41                 raise ValueError("invalid hint format: {}".format(hint))
42             elif hint.startswith('A'):
43                 self.parse_permission_hint(hint)
44             else:
45                 self.hints.append(hint)
46
47     def __str__(self):
48         return '+'.join(
49             str(s) for s in [self.md5sum, self.size,
50                              self.permission_hint()] + self.hints
51             if s is not None)
52
53     def stripped(self):
54         if self.size is not None:
55             return "%s+%i" % (self.md5sum, self.size)
56         else:
57             return self.md5sum
58
59     def _make_hex_prop(name, length):
60         # Build and return a new property with the given name that
61         # must be a hex string of the given length.
62         data_name = '_{}'.format(name)
63         def getter(self):
64             return getattr(self, data_name)
65         def setter(self, hex_str):
66             if not arvados.util.is_hex(hex_str, length):
67                 raise ValueError("{} is not a {}-digit hex string: {}".
68                                  format(name, length, hex_str))
69             setattr(self, data_name, hex_str)
70         return property(getter, setter)
71
72     md5sum = _make_hex_prop('md5sum', 32)
73     perm_sig = _make_hex_prop('perm_sig', 40)
74
75     @property
76     def perm_expiry(self):
77         return self._perm_expiry
78
79     @perm_expiry.setter
80     def perm_expiry(self, value):
81         if not arvados.util.is_hex(value, 1, 8):
82             raise ValueError(
83                 "permission timestamp must be a hex Unix timestamp: {}".
84                 format(value))
85         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86
87     def permission_hint(self):
88         data = [self.perm_sig, self.perm_expiry]
89         if None in data:
90             return None
91         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92         return "A{}@{:08x}".format(*data)
93
94     def parse_permission_hint(self, s):
95         try:
96             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97         except IndexError:
98             raise ValueError("bad permission hint {}".format(s))
99
100     def permission_expired(self, as_of_dt=None):
101         if self.perm_expiry is None:
102             return False
103         elif as_of_dt is None:
104             as_of_dt = datetime.datetime.now()
105         return self.perm_expiry <= as_of_dt
106
107
108 class Keep(object):
109     """Simple interface to a global KeepClient object.
110
111     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
112     own API client.  The global KeepClient will build an API client from the
113     current Arvados configuration, which may not match the one you built.
114     """
115     _last_key = None
116
117     @classmethod
118     def global_client_object(cls):
119         global global_client_object
120         # Previously, KeepClient would change its behavior at runtime based
121         # on these configuration settings.  We simulate that behavior here
122         # by checking the values and returning a new KeepClient if any of
123         # them have changed.
124         key = (config.get('ARVADOS_API_HOST'),
125                config.get('ARVADOS_API_TOKEN'),
126                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127                config.get('ARVADOS_KEEP_PROXY'),
128                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129                os.environ.get('KEEP_LOCAL_STORE'))
130         if (global_client_object is None) or (cls._last_key != key):
131             global_client_object = KeepClient()
132             cls._last_key = key
133         return global_client_object
134
135     @staticmethod
136     def get(locator, **kwargs):
137         return Keep.global_client_object().get(locator, **kwargs)
138
139     @staticmethod
140     def put(data, **kwargs):
141         return Keep.global_client_object().put(data, **kwargs)
142
143 class KeepBlockCache(object):
144     # Default RAM cache is 256MiB
145     def __init__(self, cache_max=(256 * 1024 * 1024)):
146         self.cache_max = cache_max
147         self._cache = []
148         self._cache_lock = threading.Lock()
149
150     class CacheSlot(object):
151         def __init__(self, locator):
152             self.locator = locator
153             self.ready = threading.Event()
154             self.content = None
155
156         def get(self):
157             self.ready.wait()
158             return self.content
159
160         def set(self, value):
161             self.content = value
162             self.ready.set()
163
164         def size(self):
165             if self.content is None:
166                 return 0
167             else:
168                 return len(self.content)
169
170     def cap_cache(self):
171         '''Cap the cache size to self.cache_max'''
172         with self._cache_lock:
173             # Select all slots except those where ready.is_set() and content is
174             # None (that means there was an error reading the block).
175             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
176             sm = sum([slot.size() for slot in self._cache])
177             while len(self._cache) > 0 and sm > self.cache_max:
178                 for i in xrange(len(self._cache)-1, -1, -1):
179                     if self._cache[i].ready.is_set():
180                         del self._cache[i]
181                         break
182                 sm = sum([slot.size() for slot in self._cache])
183
184     def _get(self, locator):
185         # Test if the locator is already in the cache
186         for i in xrange(0, len(self._cache)):
187             if self._cache[i].locator == locator:
188                 n = self._cache[i]
189                 if i != 0:
190                     # move it to the front
191                     del self._cache[i]
192                     self._cache.insert(0, n)
193                 return n
194         return None
195
196     def get(self, locator):
197         with self._cache_lock:
198             return self._get(locator)
199
200     def reserve_cache(self, locator):
201         '''Reserve a cache slot for the specified locator,
202         or return the existing slot.'''
203         with self._cache_lock:
204             n = self._get(locator)
205             if n:
206                 return n, False
207             else:
208                 # Add a new cache slot for the locator
209                 n = KeepBlockCache.CacheSlot(locator)
210                 self._cache.insert(0, n)
211                 return n, True
212
213
214 class Counter(object):
215     def __init__(self, v=0):
216         self._lk = threading.Lock()
217         self._val = v
218
219     def add(self, v):
220         with self._lk:
221             self._val += v
222
223     def get(self):
224         with self._lk:
225             return self._val
226
227
228 class KeepClient(object):
229
230     # Default Keep server connection timeout:  2 seconds
231     # Default Keep server read timeout:       64 seconds
232     # Default Keep server bandwidth minimum:  32768 bytes per second
233     # Default Keep proxy connection timeout:  20 seconds
234     # Default Keep proxy read timeout:        64 seconds
235     # Default Keep proxy bandwidth minimum:   32768 bytes per second
236     DEFAULT_TIMEOUT = (2, 64, 32768)
237     DEFAULT_PROXY_TIMEOUT = (20, 64, 32768)
238
239     class ThreadLimiter(object):
240         """Limit the number of threads writing to Keep at once.
241
242         This ensures that only a number of writer threads that could
243         potentially achieve the desired replication level run at once.
244         Once the desired replication level is achieved, queued threads
245         are instructed not to run.
246
247         Should be used in a "with" block.
248         """
249         def __init__(self, want_copies, max_service_replicas):
250             self._started = 0
251             self._want_copies = want_copies
252             self._done = 0
253             self._response = None
254             self._start_lock = threading.Condition()
255             if (not max_service_replicas) or (max_service_replicas >= want_copies):
256                 max_threads = 1
257             else:
258                 max_threads = math.ceil(float(want_copies) / max_service_replicas)
259             _logger.debug("Limiter max threads is %d", max_threads)
260             self._todo_lock = threading.Semaphore(max_threads)
261             self._done_lock = threading.Lock()
262             self._local = threading.local()
263
264         def __enter__(self):
265             self._start_lock.acquire()
266             if getattr(self._local, 'sequence', None) is not None:
267                 # If the calling thread has used set_sequence(N), then
268                 # we wait here until N other threads have started.
269                 while self._started < self._local.sequence:
270                     self._start_lock.wait()
271             self._todo_lock.acquire()
272             self._started += 1
273             self._start_lock.notifyAll()
274             self._start_lock.release()
275             return self
276
277         def __exit__(self, type, value, traceback):
278             self._todo_lock.release()
279
280         def set_sequence(self, sequence):
281             self._local.sequence = sequence
282
283         def shall_i_proceed(self):
284             """
285             Return true if the current thread should write to Keep.
286             Return false otherwise.
287             """
288             with self._done_lock:
289                 return (self._done < self._want_copies)
290
291         def save_response(self, response_body, replicas_stored):
292             """
293             Records a response body (a locator, possibly signed) returned by
294             the Keep server, and the number of replicas it stored.
295             """
296             with self._done_lock:
297                 self._done += replicas_stored
298                 self._response = response_body
299
300         def response(self):
301             """Return the body from the response to a PUT request."""
302             with self._done_lock:
303                 return self._response
304
305         def done(self):
306             """Return the total number of replicas successfully stored."""
307             with self._done_lock:
308                 return self._done
309
310     class KeepService(object):
311         """Make requests to a single Keep service, and track results.
312
313         A KeepService is intended to last long enough to perform one
314         transaction (GET or PUT) against one Keep service. This can
315         involve calling either get() or put() multiple times in order
316         to retry after transient failures. However, calling both get()
317         and put() on a single instance -- or using the same instance
318         to access two different Keep services -- will not produce
319         sensible behavior.
320         """
321
322         HTTP_ERRORS = (
323             socket.error,
324             ssl.SSLError,
325             arvados.errors.HttpError,
326         )
327
328         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
329                      upload_counter=None,
330                      download_counter=None, **headers):
331             self.root = root
332             self._user_agent_pool = user_agent_pool
333             self._result = {'error': None}
334             self._usable = True
335             self._session = None
336             self.get_headers = {'Accept': 'application/octet-stream'}
337             self.get_headers.update(headers)
338             self.put_headers = headers
339             self.upload_counter = upload_counter
340             self.download_counter = download_counter
341
342         def usable(self):
343             """Is it worth attempting a request?"""
344             return self._usable
345
346         def finished(self):
347             """Did the request succeed or encounter permanent failure?"""
348             return self._result['error'] == False or not self._usable
349
350         def last_result(self):
351             return self._result
352
353         def _get_user_agent(self):
354             try:
355                 return self._user_agent_pool.get(False)
356             except Queue.Empty:
357                 return pycurl.Curl()
358
359         def _put_user_agent(self, ua):
360             try:
361                 ua.reset()
362                 self._user_agent_pool.put(ua, False)
363             except:
364                 ua.close()
365
366         @staticmethod
367         def _socket_open(family, socktype, protocol, address=None):
368             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
369             s = socket.socket(family, socktype, protocol)
370             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
371             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
372             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
373             return s
374
375         def get(self, locator, timeout=None):
376             # locator is a KeepLocator object.
377             url = self.root + str(locator)
378             _logger.debug("Request: GET %s", url)
379             curl = self._get_user_agent()
380             try:
381                 with timer.Timer() as t:
382                     self._headers = {}
383                     response_body = cStringIO.StringIO()
384                     curl.setopt(pycurl.NOSIGNAL, 1)
385                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
386                     curl.setopt(pycurl.URL, url.encode('utf-8'))
387                     curl.setopt(pycurl.HTTPHEADER, [
388                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
389                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
390                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
391                     self._setcurltimeouts(curl, timeout)
392                     try:
393                         curl.perform()
394                     except Exception as e:
395                         raise arvados.errors.HttpError(0, str(e))
396                     self._result = {
397                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
398                         'body': response_body.getvalue(),
399                         'headers': self._headers,
400                         'error': False,
401                     }
402                 ok = retry.check_http_response_success(self._result['status_code'])
403                 if not ok:
404                     self._result['error'] = arvados.errors.HttpError(
405                         self._result['status_code'],
406                         self._headers.get('x-status-line', 'Error'))
407             except self.HTTP_ERRORS as e:
408                 self._result = {
409                     'error': e,
410                 }
411                 ok = False
412             self._usable = ok != False
413             if self._result.get('status_code', None):
414                 # The client worked well enough to get an HTTP status
415                 # code, so presumably any problems are just on the
416                 # server side and it's OK to reuse the client.
417                 self._put_user_agent(curl)
418             else:
419                 # Don't return this client to the pool, in case it's
420                 # broken.
421                 curl.close()
422             if not ok:
423                 _logger.debug("Request fail: GET %s => %s: %s",
424                               url, type(self._result['error']), str(self._result['error']))
425                 return None
426             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
427                          self._result['status_code'],
428                          len(self._result['body']),
429                          t.msecs,
430                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
431             if self.download_counter:
432                 self.download_counter.add(len(self._result['body']))
433             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
434             if resp_md5 != locator.md5sum:
435                 _logger.warning("Checksum fail: md5(%s) = %s",
436                                 url, resp_md5)
437                 self._result['error'] = arvados.errors.HttpError(
438                     0, 'Checksum fail')
439                 return None
440             return self._result['body']
441
442         def put(self, hash_s, body, timeout=None):
443             url = self.root + hash_s
444             _logger.debug("Request: PUT %s", url)
445             curl = self._get_user_agent()
446             try:
447                 with timer.Timer() as t:
448                     self._headers = {}
449                     body_reader = cStringIO.StringIO(body)
450                     response_body = cStringIO.StringIO()
451                     curl.setopt(pycurl.NOSIGNAL, 1)
452                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
453                     curl.setopt(pycurl.URL, url.encode('utf-8'))
454                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
455                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
456                     # response) instead of sending the request body immediately.
457                     # This allows the server to reject the request if the request
458                     # is invalid or the server is read-only, without waiting for
459                     # the client to send the entire block.
460                     curl.setopt(pycurl.UPLOAD, True)
461                     curl.setopt(pycurl.INFILESIZE, len(body))
462                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
463                     curl.setopt(pycurl.HTTPHEADER, [
464                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
465                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
466                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
467                     self._setcurltimeouts(curl, timeout)
468                     try:
469                         curl.perform()
470                     except Exception as e:
471                         raise arvados.errors.HttpError(0, str(e))
472                     self._result = {
473                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
474                         'body': response_body.getvalue(),
475                         'headers': self._headers,
476                         'error': False,
477                     }
478                 ok = retry.check_http_response_success(self._result['status_code'])
479                 if not ok:
480                     self._result['error'] = arvados.errors.HttpError(
481                         self._result['status_code'],
482                         self._headers.get('x-status-line', 'Error'))
483             except self.HTTP_ERRORS as e:
484                 self._result = {
485                     'error': e,
486                 }
487                 ok = False
488             self._usable = ok != False # still usable if ok is True or None
489             if self._result.get('status_code', None):
490                 # Client is functional. See comment in get().
491                 self._put_user_agent(curl)
492             else:
493                 curl.close()
494             if not ok:
495                 _logger.debug("Request fail: PUT %s => %s: %s",
496                               url, type(self._result['error']), str(self._result['error']))
497                 return False
498             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
499                          self._result['status_code'],
500                          len(body),
501                          t.msecs,
502                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
503             if self.upload_counter:
504                 self.upload_counter.add(len(body))
505             return True
506
507         def _setcurltimeouts(self, curl, timeouts):
508             if not timeouts:
509                 return
510             elif isinstance(timeouts, tuple):
511                 if len(timeouts) == 2:
512                     conn_t, xfer_t = timeouts
513                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
514                 else:
515                     conn_t, xfer_t, bandwidth_bps = timeouts
516             else:
517                 conn_t, xfer_t = (timeouts, timeouts)
518                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
519             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
520             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
521             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
522
523         def _headerfunction(self, header_line):
524             header_line = header_line.decode('iso-8859-1')
525             if ':' in header_line:
526                 name, value = header_line.split(':', 1)
527                 name = name.strip().lower()
528                 value = value.strip()
529             elif self._headers:
530                 name = self._lastheadername
531                 value = self._headers[name] + ' ' + header_line.strip()
532             elif header_line.startswith('HTTP/'):
533                 name = 'x-status-line'
534                 value = header_line
535             else:
536                 _logger.error("Unexpected header line: %s", header_line)
537                 return
538             self._lastheadername = name
539             self._headers[name] = value
540             # Returning None implies all bytes were written
541
542
543     class KeepWriterThread(threading.Thread):
544         """
545         Write a blob of data to the given Keep server. On success, call
546         save_response() of the given ThreadLimiter to save the returned
547         locator.
548         """
549         def __init__(self, keep_service, **kwargs):
550             super(KeepClient.KeepWriterThread, self).__init__()
551             self.service = keep_service
552             self.args = kwargs
553             self._success = False
554
555         def success(self):
556             return self._success
557
558         def run(self):
559             limiter = self.args['thread_limiter']
560             sequence = self.args['thread_sequence']
561             if sequence is not None:
562                 limiter.set_sequence(sequence)
563             with limiter:
564                 if not limiter.shall_i_proceed():
565                     # My turn arrived, but the job has been done without
566                     # me.
567                     return
568                 self.run_with_limiter(limiter)
569
570         def run_with_limiter(self, limiter):
571             if self.service.finished():
572                 return
573             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
574                           str(threading.current_thread()),
575                           self.args['data_hash'],
576                           len(self.args['data']),
577                           self.args['service_root'])
578             self._success = bool(self.service.put(
579                 self.args['data_hash'],
580                 self.args['data'],
581                 timeout=self.args.get('timeout', None)))
582             result = self.service.last_result()
583             if self._success:
584                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
585                               str(threading.current_thread()),
586                               self.args['data_hash'],
587                               len(self.args['data']),
588                               self.args['service_root'])
589                 # Tick the 'done' counter for the number of replica
590                 # reported stored by the server, for the case that
591                 # we're talking to a proxy or other backend that
592                 # stores to multiple copies for us.
593                 try:
594                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
595                 except (KeyError, ValueError):
596                     replicas_stored = 1
597                 limiter.save_response(result['body'].strip(), replicas_stored)
598             elif result.get('status_code', None):
599                 _logger.debug("Request fail: PUT %s => %s %s",
600                               self.args['data_hash'],
601                               result['status_code'],
602                               result['body'])
603
604
605     def __init__(self, api_client=None, proxy=None,
606                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
607                  api_token=None, local_store=None, block_cache=None,
608                  num_retries=0, session=None):
609         """Initialize a new KeepClient.
610
611         Arguments:
612         :api_client:
613           The API client to use to find Keep services.  If not
614           provided, KeepClient will build one from available Arvados
615           configuration.
616
617         :proxy:
618           If specified, this KeepClient will send requests to this Keep
619           proxy.  Otherwise, KeepClient will fall back to the setting of the
620           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
621           KeepClient does not use a proxy, pass in an empty string.
622
623         :timeout:
624           The initial timeout (in seconds) for HTTP requests to Keep
625           non-proxy servers.  A tuple of three floats is interpreted as
626           (connection_timeout, read_timeout, minimum_bandwidth). A connection
627           will be aborted if the average traffic rate falls below
628           minimum_bandwidth bytes per second over an interval of read_timeout
629           seconds. Because timeouts are often a result of transient server
630           load, the actual connection timeout will be increased by a factor
631           of two on each retry.
632           Default: (2, 64, 32768).
633
634         :proxy_timeout:
635           The initial timeout (in seconds) for HTTP requests to
636           Keep proxies. A tuple of three floats is interpreted as
637           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
638           described above for adjusting connection timeouts on retry also
639           applies.
640           Default: (20, 64, 32768).
641
642         :api_token:
643           If you're not using an API client, but only talking
644           directly to a Keep proxy, this parameter specifies an API token
645           to authenticate Keep requests.  It is an error to specify both
646           api_client and api_token.  If you specify neither, KeepClient
647           will use one available from the Arvados configuration.
648
649         :local_store:
650           If specified, this KeepClient will bypass Keep
651           services, and save data to the named directory.  If unspecified,
652           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
653           environment variable.  If you want to ensure KeepClient does not
654           use local storage, pass in an empty string.  This is primarily
655           intended to mock a server for testing.
656
657         :num_retries:
658           The default number of times to retry failed requests.
659           This will be used as the default num_retries value when get() and
660           put() are called.  Default 0.
661         """
662         self.lock = threading.Lock()
663         if proxy is None:
664             proxy = config.get('ARVADOS_KEEP_PROXY')
665         if api_token is None:
666             if api_client is None:
667                 api_token = config.get('ARVADOS_API_TOKEN')
668             else:
669                 api_token = api_client.api_token
670         elif api_client is not None:
671             raise ValueError(
672                 "can't build KeepClient with both API client and token")
673         if local_store is None:
674             local_store = os.environ.get('KEEP_LOCAL_STORE')
675
676         self.block_cache = block_cache if block_cache else KeepBlockCache()
677         self.timeout = timeout
678         self.proxy_timeout = proxy_timeout
679         self._user_agent_pool = Queue.LifoQueue()
680         self.upload_counter = Counter()
681         self.download_counter = Counter()
682         self.put_counter = Counter()
683         self.get_counter = Counter()
684         self.hits_counter = Counter()
685         self.misses_counter = Counter()
686
687         if local_store:
688             self.local_store = local_store
689             self.get = self.local_store_get
690             self.put = self.local_store_put
691         else:
692             self.num_retries = num_retries
693             self.max_replicas_per_service = None
694             if proxy:
695                 if not proxy.endswith('/'):
696                     proxy += '/'
697                 self.api_token = api_token
698                 self._gateway_services = {}
699                 self._keep_services = [{
700                     'uuid': 'proxy',
701                     'service_type': 'proxy',
702                     '_service_root': proxy,
703                     }]
704                 self._writable_services = self._keep_services
705                 self.using_proxy = True
706                 self._static_services_list = True
707             else:
708                 # It's important to avoid instantiating an API client
709                 # unless we actually need one, for testing's sake.
710                 if api_client is None:
711                     api_client = arvados.api('v1')
712                 self.api_client = api_client
713                 self.api_token = api_client.api_token
714                 self._gateway_services = {}
715                 self._keep_services = None
716                 self._writable_services = None
717                 self.using_proxy = None
718                 self._static_services_list = False
719
720     def current_timeout(self, attempt_number):
721         """Return the appropriate timeout to use for this client.
722
723         The proxy timeout setting if the backend service is currently a proxy,
724         the regular timeout setting otherwise.  The `attempt_number` indicates
725         how many times the operation has been tried already (starting from 0
726         for the first try), and scales the connection timeout portion of the
727         return value accordingly.
728
729         """
730         # TODO(twp): the timeout should be a property of a
731         # KeepService, not a KeepClient. See #4488.
732         t = self.proxy_timeout if self.using_proxy else self.timeout
733         if len(t) == 2:
734             return (t[0] * (1 << attempt_number), t[1])
735         else:
736             return (t[0] * (1 << attempt_number), t[1], t[2])
737     def _any_nondisk_services(self, service_list):
738         return any(ks.get('service_type', 'disk') != 'disk'
739                    for ks in service_list)
740
741     def build_services_list(self, force_rebuild=False):
742         if (self._static_services_list or
743               (self._keep_services and not force_rebuild)):
744             return
745         with self.lock:
746             try:
747                 keep_services = self.api_client.keep_services().accessible()
748             except Exception:  # API server predates Keep services.
749                 keep_services = self.api_client.keep_disks().list()
750
751             # Gateway services are only used when specified by UUID,
752             # so there's nothing to gain by filtering them by
753             # service_type.
754             self._gateway_services = {ks['uuid']: ks for ks in
755                                       keep_services.execute()['items']}
756             if not self._gateway_services:
757                 raise arvados.errors.NoKeepServersError()
758
759             # Precompute the base URI for each service.
760             for r in self._gateway_services.itervalues():
761                 host = r['service_host']
762                 if not host.startswith('[') and host.find(':') >= 0:
763                     # IPv6 URIs must be formatted like http://[::1]:80/...
764                     host = '[' + host + ']'
765                 r['_service_root'] = "{}://{}:{:d}/".format(
766                     'https' if r['service_ssl_flag'] else 'http',
767                     host,
768                     r['service_port'])
769
770             _logger.debug(str(self._gateway_services))
771             self._keep_services = [
772                 ks for ks in self._gateway_services.itervalues()
773                 if not ks.get('service_type', '').startswith('gateway:')]
774             self._writable_services = [ks for ks in self._keep_services
775                                        if not ks.get('read_only')]
776
777             # For disk type services, max_replicas_per_service is 1
778             # It is unknown (unlimited) for other service types.
779             if self._any_nondisk_services(self._writable_services):
780                 self.max_replicas_per_service = None
781             else:
782                 self.max_replicas_per_service = 1
783
784     def _service_weight(self, data_hash, service_uuid):
785         """Compute the weight of a Keep service endpoint for a data
786         block with a known hash.
787
788         The weight is md5(h + u) where u is the last 15 characters of
789         the service endpoint's UUID.
790         """
791         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
792
793     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
794         """Return an array of Keep service endpoints, in the order in
795         which they should be probed when reading or writing data with
796         the given hash+hints.
797         """
798         self.build_services_list(force_rebuild)
799
800         sorted_roots = []
801         # Use the services indicated by the given +K@... remote
802         # service hints, if any are present and can be resolved to a
803         # URI.
804         for hint in locator.hints:
805             if hint.startswith('K@'):
806                 if len(hint) == 7:
807                     sorted_roots.append(
808                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
809                 elif len(hint) == 29:
810                     svc = self._gateway_services.get(hint[2:])
811                     if svc:
812                         sorted_roots.append(svc['_service_root'])
813
814         # Sort the available local services by weight (heaviest first)
815         # for this locator, and return their service_roots (base URIs)
816         # in that order.
817         use_services = self._keep_services
818         if need_writable:
819             use_services = self._writable_services
820         self.using_proxy = self._any_nondisk_services(use_services)
821         sorted_roots.extend([
822             svc['_service_root'] for svc in sorted(
823                 use_services,
824                 reverse=True,
825                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
826         _logger.debug("{}: {}".format(locator, sorted_roots))
827         return sorted_roots
828
829     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
830         # roots_map is a dictionary, mapping Keep service root strings
831         # to KeepService objects.  Poll for Keep services, and add any
832         # new ones to roots_map.  Return the current list of local
833         # root strings.
834         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
835         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
836         for root in local_roots:
837             if root not in roots_map:
838                 roots_map[root] = self.KeepService(
839                     root, self._user_agent_pool,
840                     upload_counter=self.upload_counter,
841                     download_counter=self.download_counter,
842                     **headers)
843         return local_roots
844
845     @staticmethod
846     def _check_loop_result(result):
847         # KeepClient RetryLoops should save results as a 2-tuple: the
848         # actual result of the request, and the number of servers available
849         # to receive the request this round.
850         # This method returns True if there's a real result, False if
851         # there are no more servers available, otherwise None.
852         if isinstance(result, Exception):
853             return None
854         result, tried_server_count = result
855         if (result is not None) and (result is not False):
856             return True
857         elif tried_server_count < 1:
858             _logger.info("No more Keep services to try; giving up")
859             return False
860         else:
861             return None
862
863     def get_from_cache(self, loc):
864         """Fetch a block only if is in the cache, otherwise return None."""
865         slot = self.block_cache.get(loc)
866         if slot is not None and slot.ready.is_set():
867             return slot.get()
868         else:
869             return None
870
871     @retry.retry_method
872     def get(self, loc_s, num_retries=None):
873         """Get data from Keep.
874
875         This method fetches one or more blocks of data from Keep.  It
876         sends a request each Keep service registered with the API
877         server (or the proxy provided when this client was
878         instantiated), then each service named in location hints, in
879         sequence.  As soon as one service provides the data, it's
880         returned.
881
882         Arguments:
883         * loc_s: A string of one or more comma-separated locators to fetch.
884           This method returns the concatenation of these blocks.
885         * num_retries: The number of times to retry GET requests to
886           *each* Keep server if it returns temporary failures, with
887           exponential backoff.  Note that, in each loop, the method may try
888           to fetch data from every available Keep service, along with any
889           that are named in location hints in the locator.  The default value
890           is set when the KeepClient is initialized.
891         """
892         if ',' in loc_s:
893             return ''.join(self.get(x) for x in loc_s.split(','))
894
895         self.get_counter.add(1)
896
897         locator = KeepLocator(loc_s)
898         slot, first = self.block_cache.reserve_cache(locator.md5sum)
899         if not first:
900             self.hits_counter.add(1)
901             v = slot.get()
902             return v
903
904         self.misses_counter.add(1)
905
906         # If the locator has hints specifying a prefix (indicating a
907         # remote keepproxy) or the UUID of a local gateway service,
908         # read data from the indicated service(s) instead of the usual
909         # list of local disk services.
910         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
911                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
912         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
913                            for hint in locator.hints if (
914                                    hint.startswith('K@') and
915                                    len(hint) == 29 and
916                                    self._gateway_services.get(hint[2:])
917                                    )])
918         # Map root URLs to their KeepService objects.
919         roots_map = {
920             root: self.KeepService(root, self._user_agent_pool,
921                                    upload_counter=self.upload_counter,
922                                    download_counter=self.download_counter)
923             for root in hint_roots
924         }
925
926         # See #3147 for a discussion of the loop implementation.  Highlights:
927         # * Refresh the list of Keep services after each failure, in case
928         #   it's being updated.
929         # * Retry until we succeed, we're out of retries, or every available
930         #   service has returned permanent failure.
931         sorted_roots = []
932         roots_map = {}
933         blob = None
934         loop = retry.RetryLoop(num_retries, self._check_loop_result,
935                                backoff_start=2)
936         for tries_left in loop:
937             try:
938                 sorted_roots = self.map_new_services(
939                     roots_map, locator,
940                     force_rebuild=(tries_left < num_retries),
941                     need_writable=False)
942             except Exception as error:
943                 loop.save_result(error)
944                 continue
945
946             # Query KeepService objects that haven't returned
947             # permanent failure, in our specified shuffle order.
948             services_to_try = [roots_map[root]
949                                for root in sorted_roots
950                                if roots_map[root].usable()]
951             for keep_service in services_to_try:
952                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
953                 if blob is not None:
954                     break
955             loop.save_result((blob, len(services_to_try)))
956
957         # Always cache the result, then return it if we succeeded.
958         slot.set(blob)
959         self.block_cache.cap_cache()
960         if loop.success():
961             return blob
962
963         # Q: Including 403 is necessary for the Keep tests to continue
964         # passing, but maybe they should expect KeepReadError instead?
965         not_founds = sum(1 for key in sorted_roots
966                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
967         service_errors = ((key, roots_map[key].last_result()['error'])
968                           for key in sorted_roots)
969         if not roots_map:
970             raise arvados.errors.KeepReadError(
971                 "failed to read {}: no Keep services available ({})".format(
972                     loc_s, loop.last_result()))
973         elif not_founds == len(sorted_roots):
974             raise arvados.errors.NotFoundError(
975                 "{} not found".format(loc_s), service_errors)
976         else:
977             raise arvados.errors.KeepReadError(
978                 "failed to read {}".format(loc_s), service_errors, label="service")
979
980     @retry.retry_method
981     def put(self, data, copies=2, num_retries=None):
982         """Save data in Keep.
983
984         This method will get a list of Keep services from the API server, and
985         send the data to each one simultaneously in a new thread.  Once the
986         uploads are finished, if enough copies are saved, this method returns
987         the most recent HTTP response body.  If requests fail to upload
988         enough copies, this method raises KeepWriteError.
989
990         Arguments:
991         * data: The string of data to upload.
992         * copies: The number of copies that the user requires be saved.
993           Default 2.
994         * num_retries: The number of times to retry PUT requests to
995           *each* Keep server if it returns temporary failures, with
996           exponential backoff.  The default value is set when the
997           KeepClient is initialized.
998         """
999
1000         if isinstance(data, unicode):
1001             data = data.encode("ascii")
1002         elif not isinstance(data, str):
1003             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1004
1005         self.put_counter.add(1)
1006
1007         data_hash = hashlib.md5(data).hexdigest()
1008         loc_s = data_hash + '+' + str(len(data))
1009         if copies < 1:
1010             return loc_s
1011         locator = KeepLocator(loc_s)
1012
1013         headers = {}
1014         # Tell the proxy how many copies we want it to store
1015         headers['X-Keep-Desired-Replication'] = str(copies)
1016         roots_map = {}
1017         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1018                                backoff_start=2)
1019         for tries_left in loop:
1020             try:
1021                 sorted_roots = self.map_new_services(
1022                     roots_map, locator,
1023                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1024             except Exception as error:
1025                 loop.save_result(error)
1026                 continue
1027
1028             thread_limiter = KeepClient.ThreadLimiter(
1029                 copies, self.max_replicas_per_service)
1030             threads = []
1031             for service_root, ks in [(root, roots_map[root])
1032                                      for root in sorted_roots]:
1033                 if ks.finished():
1034                     continue
1035                 t = KeepClient.KeepWriterThread(
1036                     ks,
1037                     data=data,
1038                     data_hash=data_hash,
1039                     service_root=service_root,
1040                     thread_limiter=thread_limiter,
1041                     timeout=self.current_timeout(num_retries-tries_left),
1042                     thread_sequence=len(threads))
1043                 t.start()
1044                 threads.append(t)
1045             for t in threads:
1046                 t.join()
1047             loop.save_result((thread_limiter.done() >= copies, len(threads)))
1048
1049         if loop.success():
1050             return thread_limiter.response()
1051         if not roots_map:
1052             raise arvados.errors.KeepWriteError(
1053                 "failed to write {}: no Keep services available ({})".format(
1054                     data_hash, loop.last_result()))
1055         else:
1056             service_errors = ((key, roots_map[key].last_result()['error'])
1057                               for key in sorted_roots
1058                               if roots_map[key].last_result()['error'])
1059             raise arvados.errors.KeepWriteError(
1060                 "failed to write {} (wanted {} copies but wrote {})".format(
1061                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1062
1063     def local_store_put(self, data, copies=1, num_retries=None):
1064         """A stub for put().
1065
1066         This method is used in place of the real put() method when
1067         using local storage (see constructor's local_store argument).
1068
1069         copies and num_retries arguments are ignored: they are here
1070         only for the sake of offering the same call signature as
1071         put().
1072
1073         Data stored this way can be retrieved via local_store_get().
1074         """
1075         md5 = hashlib.md5(data).hexdigest()
1076         locator = '%s+%d' % (md5, len(data))
1077         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1078             f.write(data)
1079         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1080                   os.path.join(self.local_store, md5))
1081         return locator
1082
1083     def local_store_get(self, loc_s, num_retries=None):
1084         """Companion to local_store_put()."""
1085         try:
1086             locator = KeepLocator(loc_s)
1087         except ValueError:
1088             raise arvados.errors.NotFoundError(
1089                 "Invalid data locator: '%s'" % loc_s)
1090         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1091             return ''
1092         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1093             return f.read()
1094
1095     def is_cached(self, locator):
1096         return self.block_cache.reserve_cache(expect_hash)