7696: PySDK KeepClient uses all service types.
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import os
6 import pycurl
7 import Queue
8 import re
9 import socket
10 import ssl
11 import threading
12 import timer
13
14 import arvados
15 import arvados.config as config
16 import arvados.errors
17 import arvados.retry as retry
18 import arvados.util
19
20 _logger = logging.getLogger('arvados.keep')
21 global_client_object = None
22
23
24 class KeepLocator(object):
25     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
26     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
27
28     def __init__(self, locator_str):
29         self.hints = []
30         self._perm_sig = None
31         self._perm_expiry = None
32         pieces = iter(locator_str.split('+'))
33         self.md5sum = next(pieces)
34         try:
35             self.size = int(next(pieces))
36         except StopIteration:
37             self.size = None
38         for hint in pieces:
39             if self.HINT_RE.match(hint) is None:
40                 raise ValueError("invalid hint format: {}".format(hint))
41             elif hint.startswith('A'):
42                 self.parse_permission_hint(hint)
43             else:
44                 self.hints.append(hint)
45
46     def __str__(self):
47         return '+'.join(
48             str(s) for s in [self.md5sum, self.size,
49                              self.permission_hint()] + self.hints
50             if s is not None)
51
52     def stripped(self):
53         if self.size is not None:
54             return "%s+%i" % (self.md5sum, self.size)
55         else:
56             return self.md5sum
57
58     def _make_hex_prop(name, length):
59         # Build and return a new property with the given name that
60         # must be a hex string of the given length.
61         data_name = '_{}'.format(name)
62         def getter(self):
63             return getattr(self, data_name)
64         def setter(self, hex_str):
65             if not arvados.util.is_hex(hex_str, length):
66                 raise ValueError("{} is not a {}-digit hex string: {}".
67                                  format(name, length, hex_str))
68             setattr(self, data_name, hex_str)
69         return property(getter, setter)
70
71     md5sum = _make_hex_prop('md5sum', 32)
72     perm_sig = _make_hex_prop('perm_sig', 40)
73
74     @property
75     def perm_expiry(self):
76         return self._perm_expiry
77
78     @perm_expiry.setter
79     def perm_expiry(self, value):
80         if not arvados.util.is_hex(value, 1, 8):
81             raise ValueError(
82                 "permission timestamp must be a hex Unix timestamp: {}".
83                 format(value))
84         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
85
86     def permission_hint(self):
87         data = [self.perm_sig, self.perm_expiry]
88         if None in data:
89             return None
90         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
91         return "A{}@{:08x}".format(*data)
92
93     def parse_permission_hint(self, s):
94         try:
95             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
96         except IndexError:
97             raise ValueError("bad permission hint {}".format(s))
98
99     def permission_expired(self, as_of_dt=None):
100         if self.perm_expiry is None:
101             return False
102         elif as_of_dt is None:
103             as_of_dt = datetime.datetime.now()
104         return self.perm_expiry <= as_of_dt
105
106
107 class Keep(object):
108     """Simple interface to a global KeepClient object.
109
110     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
111     own API client.  The global KeepClient will build an API client from the
112     current Arvados configuration, which may not match the one you built.
113     """
114     _last_key = None
115
116     @classmethod
117     def global_client_object(cls):
118         global global_client_object
119         # Previously, KeepClient would change its behavior at runtime based
120         # on these configuration settings.  We simulate that behavior here
121         # by checking the values and returning a new KeepClient if any of
122         # them have changed.
123         key = (config.get('ARVADOS_API_HOST'),
124                config.get('ARVADOS_API_TOKEN'),
125                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
126                config.get('ARVADOS_KEEP_PROXY'),
127                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
128                os.environ.get('KEEP_LOCAL_STORE'))
129         if (global_client_object is None) or (cls._last_key != key):
130             global_client_object = KeepClient()
131             cls._last_key = key
132         return global_client_object
133
134     @staticmethod
135     def get(locator, **kwargs):
136         return Keep.global_client_object().get(locator, **kwargs)
137
138     @staticmethod
139     def put(data, **kwargs):
140         return Keep.global_client_object().put(data, **kwargs)
141
142 class KeepBlockCache(object):
143     # Default RAM cache is 256MiB
144     def __init__(self, cache_max=(256 * 1024 * 1024)):
145         self.cache_max = cache_max
146         self._cache = []
147         self._cache_lock = threading.Lock()
148
149     class CacheSlot(object):
150         def __init__(self, locator):
151             self.locator = locator
152             self.ready = threading.Event()
153             self.content = None
154
155         def get(self):
156             self.ready.wait()
157             return self.content
158
159         def set(self, value):
160             self.content = value
161             self.ready.set()
162
163         def size(self):
164             if self.content is None:
165                 return 0
166             else:
167                 return len(self.content)
168
169     def cap_cache(self):
170         '''Cap the cache size to self.cache_max'''
171         with self._cache_lock:
172             # Select all slots except those where ready.is_set() and content is
173             # None (that means there was an error reading the block).
174             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
175             sm = sum([slot.size() for slot in self._cache])
176             while len(self._cache) > 0 and sm > self.cache_max:
177                 for i in xrange(len(self._cache)-1, -1, -1):
178                     if self._cache[i].ready.is_set():
179                         del self._cache[i]
180                         break
181                 sm = sum([slot.size() for slot in self._cache])
182
183     def _get(self, locator):
184         # Test if the locator is already in the cache
185         for i in xrange(0, len(self._cache)):
186             if self._cache[i].locator == locator:
187                 n = self._cache[i]
188                 if i != 0:
189                     # move it to the front
190                     del self._cache[i]
191                     self._cache.insert(0, n)
192                 return n
193         return None
194
195     def get(self, locator):
196         with self._cache_lock:
197             return self._get(locator)
198
199     def reserve_cache(self, locator):
200         '''Reserve a cache slot for the specified locator,
201         or return the existing slot.'''
202         with self._cache_lock:
203             n = self._get(locator)
204             if n:
205                 return n, False
206             else:
207                 # Add a new cache slot for the locator
208                 n = KeepBlockCache.CacheSlot(locator)
209                 self._cache.insert(0, n)
210                 return n, True
211
212 class KeepClient(object):
213
214     # Default Keep server connection timeout:  2 seconds
215     # Default Keep server read timeout:      300 seconds
216     # Default Keep proxy connection timeout:  20 seconds
217     # Default Keep proxy read timeout:       300 seconds
218     DEFAULT_TIMEOUT = (2, 300)
219     DEFAULT_PROXY_TIMEOUT = (20, 300)
220
221     class ThreadLimiter(object):
222         """
223         Limit the number of threads running at a given time to
224         {desired successes} minus {successes reported}. When successes
225         reported == desired, wake up the remaining threads and tell
226         them to quit.
227
228         Should be used in a "with" block.
229         """
230         def __init__(self, todo):
231             self._started = 0
232             self._todo = todo
233             self._done = 0
234             self._response = None
235             self._start_lock = threading.Condition()
236             self._todo_lock = threading.Semaphore(todo)
237             self._done_lock = threading.Lock()
238             self._local = threading.local()
239
240         def __enter__(self):
241             self._start_lock.acquire()
242             if getattr(self._local, 'sequence', None) is not None:
243                 # If the calling thread has used set_sequence(N), then
244                 # we wait here until N other threads have started.
245                 while self._started < self._local.sequence:
246                     self._start_lock.wait()
247             self._todo_lock.acquire()
248             self._started += 1
249             self._start_lock.notifyAll()
250             self._start_lock.release()
251             return self
252
253         def __exit__(self, type, value, traceback):
254             self._todo_lock.release()
255
256         def set_sequence(self, sequence):
257             self._local.sequence = sequence
258
259         def shall_i_proceed(self):
260             """
261             Return true if the current thread should do stuff. Return
262             false if the current thread should just stop.
263             """
264             with self._done_lock:
265                 return (self._done < self._todo)
266
267         def save_response(self, response_body, replicas_stored):
268             """
269             Records a response body (a locator, possibly signed) returned by
270             the Keep server.  It is not necessary to save more than
271             one response, since we presume that any locator returned
272             in response to a successful request is valid.
273             """
274             with self._done_lock:
275                 self._done += replicas_stored
276                 self._response = response_body
277
278         def response(self):
279             """
280             Returns the body from the response to a PUT request.
281             """
282             with self._done_lock:
283                 return self._response
284
285         def done(self):
286             """
287             Return how many successes were reported.
288             """
289             with self._done_lock:
290                 return self._done
291
292
293     class KeepService(object):
294         """Make requests to a single Keep service, and track results.
295
296         A KeepService is intended to last long enough to perform one
297         transaction (GET or PUT) against one Keep service. This can
298         involve calling either get() or put() multiple times in order
299         to retry after transient failures. However, calling both get()
300         and put() on a single instance -- or using the same instance
301         to access two different Keep services -- will not produce
302         sensible behavior.
303         """
304
305         HTTP_ERRORS = (
306             socket.error,
307             ssl.SSLError,
308             arvados.errors.HttpError,
309         )
310
311         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
312             self.root = root
313             self._user_agent_pool = user_agent_pool
314             self._result = {'error': None}
315             self._usable = True
316             self._session = None
317             self.get_headers = {'Accept': 'application/octet-stream'}
318             self.get_headers.update(headers)
319             self.put_headers = headers
320
321         def usable(self):
322             """Is it worth attempting a request?"""
323             return self._usable
324
325         def finished(self):
326             """Did the request succeed or encounter permanent failure?"""
327             return self._result['error'] == False or not self._usable
328
329         def last_result(self):
330             return self._result
331
332         def _get_user_agent(self):
333             try:
334                 return self._user_agent_pool.get(False)
335             except Queue.Empty:
336                 return pycurl.Curl()
337
338         def _put_user_agent(self, ua):
339             try:
340                 ua.reset()
341                 self._user_agent_pool.put(ua, False)
342             except:
343                 ua.close()
344
345         @staticmethod
346         def _socket_open(family, socktype, protocol, address=None):
347             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
348             s = socket.socket(family, socktype, protocol)
349             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
350             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
351             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
352             return s
353
354         def get(self, locator, timeout=None):
355             # locator is a KeepLocator object.
356             url = self.root + str(locator)
357             _logger.debug("Request: GET %s", url)
358             curl = self._get_user_agent()
359             try:
360                 with timer.Timer() as t:
361                     self._headers = {}
362                     response_body = cStringIO.StringIO()
363                     curl.setopt(pycurl.NOSIGNAL, 1)
364                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
365                     curl.setopt(pycurl.URL, url.encode('utf-8'))
366                     curl.setopt(pycurl.HTTPHEADER, [
367                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
368                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
369                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
370                     self._setcurltimeouts(curl, timeout)
371                     try:
372                         curl.perform()
373                     except Exception as e:
374                         raise arvados.errors.HttpError(0, str(e))
375                     self._result = {
376                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
377                         'body': response_body.getvalue(),
378                         'headers': self._headers,
379                         'error': False,
380                     }
381                 ok = retry.check_http_response_success(self._result['status_code'])
382                 if not ok:
383                     self._result['error'] = arvados.errors.HttpError(
384                         self._result['status_code'],
385                         self._headers.get('x-status-line', 'Error'))
386             except self.HTTP_ERRORS as e:
387                 self._result = {
388                     'error': e,
389                 }
390                 ok = False
391             self._usable = ok != False
392             if self._result.get('status_code', None):
393                 # The client worked well enough to get an HTTP status
394                 # code, so presumably any problems are just on the
395                 # server side and it's OK to reuse the client.
396                 self._put_user_agent(curl)
397             else:
398                 # Don't return this client to the pool, in case it's
399                 # broken.
400                 curl.close()
401             if not ok:
402                 _logger.debug("Request fail: GET %s => %s: %s",
403                               url, type(self._result['error']), str(self._result['error']))
404                 return None
405             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
406                          self._result['status_code'],
407                          len(self._result['body']),
408                          t.msecs,
409                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
410             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
411             if resp_md5 != locator.md5sum:
412                 _logger.warning("Checksum fail: md5(%s) = %s",
413                                 url, resp_md5)
414                 self._result['error'] = arvados.errors.HttpError(
415                     0, 'Checksum fail')
416                 return None
417             return self._result['body']
418
419         def put(self, hash_s, body, timeout=None):
420             url = self.root + hash_s
421             _logger.debug("Request: PUT %s", url)
422             curl = self._get_user_agent()
423             try:
424                 self._headers = {}
425                 body_reader = cStringIO.StringIO(body)
426                 response_body = cStringIO.StringIO()
427                 curl.setopt(pycurl.NOSIGNAL, 1)
428                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
429                 curl.setopt(pycurl.URL, url.encode('utf-8'))
430                 # Using UPLOAD tells cURL to wait for a "go ahead" from the
431                 # Keep server (in the form of a HTTP/1.1 "100 Continue"
432                 # response) instead of sending the request body immediately.
433                 # This allows the server to reject the request if the request
434                 # is invalid or the server is read-only, without waiting for
435                 # the client to send the entire block.
436                 curl.setopt(pycurl.UPLOAD, True)
437                 curl.setopt(pycurl.INFILESIZE, len(body))
438                 curl.setopt(pycurl.READFUNCTION, body_reader.read)
439                 curl.setopt(pycurl.HTTPHEADER, [
440                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
441                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
442                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
443                 self._setcurltimeouts(curl, timeout)
444                 try:
445                     curl.perform()
446                 except Exception as e:
447                     raise arvados.errors.HttpError(0, str(e))
448                 self._result = {
449                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
450                     'body': response_body.getvalue(),
451                     'headers': self._headers,
452                     'error': False,
453                 }
454                 ok = retry.check_http_response_success(self._result['status_code'])
455                 if not ok:
456                     self._result['error'] = arvados.errors.HttpError(
457                         self._result['status_code'],
458                         self._headers.get('x-status-line', 'Error'))
459             except self.HTTP_ERRORS as e:
460                 self._result = {
461                     'error': e,
462                 }
463                 ok = False
464             self._usable = ok != False # still usable if ok is True or None
465             if self._result.get('status_code', None):
466                 # Client is functional. See comment in get().
467                 self._put_user_agent(curl)
468             else:
469                 curl.close()
470             if not ok:
471                 _logger.debug("Request fail: PUT %s => %s: %s",
472                               url, type(self._result['error']), str(self._result['error']))
473                 return False
474             return True
475
476         def _setcurltimeouts(self, curl, timeouts):
477             if not timeouts:
478                 return
479             elif isinstance(timeouts, tuple):
480                 conn_t, xfer_t = timeouts
481             else:
482                 conn_t, xfer_t = (timeouts, timeouts)
483             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
484             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
485
486         def _headerfunction(self, header_line):
487             header_line = header_line.decode('iso-8859-1')
488             if ':' in header_line:
489                 name, value = header_line.split(':', 1)
490                 name = name.strip().lower()
491                 value = value.strip()
492             elif self._headers:
493                 name = self._lastheadername
494                 value = self._headers[name] + ' ' + header_line.strip()
495             elif header_line.startswith('HTTP/'):
496                 name = 'x-status-line'
497                 value = header_line
498             else:
499                 _logger.error("Unexpected header line: %s", header_line)
500                 return
501             self._lastheadername = name
502             self._headers[name] = value
503             # Returning None implies all bytes were written
504
505
506     class KeepWriterThread(threading.Thread):
507         """
508         Write a blob of data to the given Keep server. On success, call
509         save_response() of the given ThreadLimiter to save the returned
510         locator.
511         """
512         def __init__(self, keep_service, **kwargs):
513             super(KeepClient.KeepWriterThread, self).__init__()
514             self.service = keep_service
515             self.args = kwargs
516             self._success = False
517
518         def success(self):
519             return self._success
520
521         def run(self):
522             limiter = self.args['thread_limiter']
523             sequence = self.args['thread_sequence']
524             if sequence is not None:
525                 limiter.set_sequence(sequence)
526             with limiter:
527                 if not limiter.shall_i_proceed():
528                     # My turn arrived, but the job has been done without
529                     # me.
530                     return
531                 self.run_with_limiter(limiter)
532
533         def run_with_limiter(self, limiter):
534             if self.service.finished():
535                 return
536             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
537                           str(threading.current_thread()),
538                           self.args['data_hash'],
539                           len(self.args['data']),
540                           self.args['service_root'])
541             self._success = bool(self.service.put(
542                 self.args['data_hash'],
543                 self.args['data'],
544                 timeout=self.args.get('timeout', None)))
545             result = self.service.last_result()
546             if self._success:
547                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
548                               str(threading.current_thread()),
549                               self.args['data_hash'],
550                               len(self.args['data']),
551                               self.args['service_root'])
552                 # Tick the 'done' counter for the number of replica
553                 # reported stored by the server, for the case that
554                 # we're talking to a proxy or other backend that
555                 # stores to multiple copies for us.
556                 try:
557                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
558                 except (KeyError, ValueError):
559                     replicas_stored = 1
560                 limiter.save_response(result['body'].strip(), replicas_stored)
561             elif result.get('status_code', None):
562                 _logger.debug("Request fail: PUT %s => %s %s",
563                               self.args['data_hash'],
564                               result['status_code'],
565                               result['body'])
566
567
568     def __init__(self, api_client=None, proxy=None,
569                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
570                  api_token=None, local_store=None, block_cache=None,
571                  num_retries=0, session=None):
572         """Initialize a new KeepClient.
573
574         Arguments:
575         :api_client:
576           The API client to use to find Keep services.  If not
577           provided, KeepClient will build one from available Arvados
578           configuration.
579
580         :proxy:
581           If specified, this KeepClient will send requests to this Keep
582           proxy.  Otherwise, KeepClient will fall back to the setting of the
583           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
584           KeepClient does not use a proxy, pass in an empty string.
585
586         :timeout:
587           The initial timeout (in seconds) for HTTP requests to Keep
588           non-proxy servers.  A tuple of two floats is interpreted as
589           (connection_timeout, read_timeout): see
590           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
591           Because timeouts are often a result of transient server load, the
592           actual connection timeout will be increased by a factor of two on
593           each retry.
594           Default: (2, 300).
595
596         :proxy_timeout:
597           The initial timeout (in seconds) for HTTP requests to
598           Keep proxies. A tuple of two floats is interpreted as
599           (connection_timeout, read_timeout). The behavior described
600           above for adjusting connection timeouts on retry also applies.
601           Default: (20, 300).
602
603         :api_token:
604           If you're not using an API client, but only talking
605           directly to a Keep proxy, this parameter specifies an API token
606           to authenticate Keep requests.  It is an error to specify both
607           api_client and api_token.  If you specify neither, KeepClient
608           will use one available from the Arvados configuration.
609
610         :local_store:
611           If specified, this KeepClient will bypass Keep
612           services, and save data to the named directory.  If unspecified,
613           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
614           environment variable.  If you want to ensure KeepClient does not
615           use local storage, pass in an empty string.  This is primarily
616           intended to mock a server for testing.
617
618         :num_retries:
619           The default number of times to retry failed requests.
620           This will be used as the default num_retries value when get() and
621           put() are called.  Default 0.
622         """
623         self.lock = threading.Lock()
624         if proxy is None:
625             proxy = config.get('ARVADOS_KEEP_PROXY')
626         if api_token is None:
627             if api_client is None:
628                 api_token = config.get('ARVADOS_API_TOKEN')
629             else:
630                 api_token = api_client.api_token
631         elif api_client is not None:
632             raise ValueError(
633                 "can't build KeepClient with both API client and token")
634         if local_store is None:
635             local_store = os.environ.get('KEEP_LOCAL_STORE')
636
637         self.block_cache = block_cache if block_cache else KeepBlockCache()
638         self.timeout = timeout
639         self.proxy_timeout = proxy_timeout
640         self._user_agent_pool = Queue.LifoQueue()
641
642         if local_store:
643             self.local_store = local_store
644             self.get = self.local_store_get
645             self.put = self.local_store_put
646         else:
647             self.num_retries = num_retries
648             if proxy:
649                 if not proxy.endswith('/'):
650                     proxy += '/'
651                 self.api_token = api_token
652                 self._gateway_services = {}
653                 self._keep_services = [{
654                     'uuid': 'proxy',
655                     'service_type': 'proxy',
656                     '_service_root': proxy,
657                     }]
658                 self._writable_services = self._keep_services
659                 self.using_proxy = True
660                 self._static_services_list = True
661                 self.max_replicas_per_service = None
662             else:
663                 # It's important to avoid instantiating an API client
664                 # unless we actually need one, for testing's sake.
665                 if api_client is None:
666                     api_client = arvados.api('v1')
667                 self.api_client = api_client
668                 self.api_token = api_client.api_token
669                 self._gateway_services = {}
670                 self._keep_services = None
671                 self._writable_services = None
672                 self.using_proxy = None
673                 self._static_services_list = False
674                 self.max_replicas_per_service = 1
675
676     def current_timeout(self, attempt_number):
677         """Return the appropriate timeout to use for this client.
678
679         The proxy timeout setting if the backend service is currently a proxy,
680         the regular timeout setting otherwise.  The `attempt_number` indicates
681         how many times the operation has been tried already (starting from 0
682         for the first try), and scales the connection timeout portion of the
683         return value accordingly.
684
685         """
686         # TODO(twp): the timeout should be a property of a
687         # KeepService, not a KeepClient. See #4488.
688         t = self.proxy_timeout if self.using_proxy else self.timeout
689         return (t[0] * (1 << attempt_number), t[1])
690
691     def _any_nondisk_services(self, service_list):
692         return any(ks.get('service_type', 'disk') != 'disk'
693                    for ks in service_list)
694
695     def build_services_list(self, force_rebuild=False):
696         if (self._static_services_list or
697               (self._keep_services and not force_rebuild)):
698             return
699         with self.lock:
700             try:
701                 keep_services = self.api_client.keep_services().accessible()
702             except Exception:  # API server predates Keep services.
703                 keep_services = self.api_client.keep_disks().list()
704
705             # Gateway services are only used when specified by UUID,
706             # so there's nothing to gain by filtering them by
707             # service_type.
708             self._gateway_services = {ks['uuid']: ks for ks in
709                                       keep_services.execute()['items']}
710             if not self._gateway_services:
711                 raise arvados.errors.NoKeepServersError()
712
713             # Precompute the base URI for each service.
714             for r in self._gateway_services.itervalues():
715                 host = r['service_host']
716                 if not host.startswith('[') and host.find(':') >= 0:
717                     # IPv6 URIs must be formatted like http://[::1]:80/...
718                     host = '[' + host + ']'
719                 r['_service_root'] = "{}://{}:{:d}/".format(
720                     'https' if r['service_ssl_flag'] else 'http',
721                     host,
722                     r['service_port'])
723
724             _logger.debug(str(self._gateway_services))
725             self._keep_services = [
726                 ks for ks in self._gateway_services.itervalues()
727                 if not ks.get('service_type', '').startswith('gateway:')]
728             self._writable_services = [ks for ks in self._keep_services
729                                        if not ks.get('read_only')]
730
731             # For disk type services, max_replicas_per_service is 1
732             # It is unknown (unlimited) for other service types.
733             if self._any_nondisk_services(self._writable_services):
734                 self.max_replicas_per_service = None
735             else:
736                 self.max_replicas_per_service = 1
737
738     def _service_weight(self, data_hash, service_uuid):
739         """Compute the weight of a Keep service endpoint for a data
740         block with a known hash.
741
742         The weight is md5(h + u) where u is the last 15 characters of
743         the service endpoint's UUID.
744         """
745         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
746
747     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
748         """Return an array of Keep service endpoints, in the order in
749         which they should be probed when reading or writing data with
750         the given hash+hints.
751         """
752         self.build_services_list(force_rebuild)
753
754         sorted_roots = []
755         # Use the services indicated by the given +K@... remote
756         # service hints, if any are present and can be resolved to a
757         # URI.
758         for hint in locator.hints:
759             if hint.startswith('K@'):
760                 if len(hint) == 7:
761                     sorted_roots.append(
762                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
763                 elif len(hint) == 29:
764                     svc = self._gateway_services.get(hint[2:])
765                     if svc:
766                         sorted_roots.append(svc['_service_root'])
767
768         # Sort the available local services by weight (heaviest first)
769         # for this locator, and return their service_roots (base URIs)
770         # in that order.
771         use_services = self._keep_services
772         if need_writable:
773             use_services = self._writable_services
774         self.using_proxy = self._any_nondisk_services(use_services)
775         sorted_roots.extend([
776             svc['_service_root'] for svc in sorted(
777                 use_services,
778                 reverse=True,
779                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
780         _logger.debug("{}: {}".format(locator, sorted_roots))
781         return sorted_roots
782
783     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
784         # roots_map is a dictionary, mapping Keep service root strings
785         # to KeepService objects.  Poll for Keep services, and add any
786         # new ones to roots_map.  Return the current list of local
787         # root strings.
788         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
789         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
790         for root in local_roots:
791             if root not in roots_map:
792                 roots_map[root] = self.KeepService(
793                     root, self._user_agent_pool, **headers)
794         return local_roots
795
796     @staticmethod
797     def _check_loop_result(result):
798         # KeepClient RetryLoops should save results as a 2-tuple: the
799         # actual result of the request, and the number of servers available
800         # to receive the request this round.
801         # This method returns True if there's a real result, False if
802         # there are no more servers available, otherwise None.
803         if isinstance(result, Exception):
804             return None
805         result, tried_server_count = result
806         if (result is not None) and (result is not False):
807             return True
808         elif tried_server_count < 1:
809             _logger.info("No more Keep services to try; giving up")
810             return False
811         else:
812             return None
813
814     def get_from_cache(self, loc):
815         """Fetch a block only if is in the cache, otherwise return None."""
816         slot = self.block_cache.get(loc)
817         if slot is not None and slot.ready.is_set():
818             return slot.get()
819         else:
820             return None
821
822     @retry.retry_method
823     def get(self, loc_s, num_retries=None):
824         """Get data from Keep.
825
826         This method fetches one or more blocks of data from Keep.  It
827         sends a request each Keep service registered with the API
828         server (or the proxy provided when this client was
829         instantiated), then each service named in location hints, in
830         sequence.  As soon as one service provides the data, it's
831         returned.
832
833         Arguments:
834         * loc_s: A string of one or more comma-separated locators to fetch.
835           This method returns the concatenation of these blocks.
836         * num_retries: The number of times to retry GET requests to
837           *each* Keep server if it returns temporary failures, with
838           exponential backoff.  Note that, in each loop, the method may try
839           to fetch data from every available Keep service, along with any
840           that are named in location hints in the locator.  The default value
841           is set when the KeepClient is initialized.
842         """
843         if ',' in loc_s:
844             return ''.join(self.get(x) for x in loc_s.split(','))
845         locator = KeepLocator(loc_s)
846         slot, first = self.block_cache.reserve_cache(locator.md5sum)
847         if not first:
848             v = slot.get()
849             return v
850
851         # If the locator has hints specifying a prefix (indicating a
852         # remote keepproxy) or the UUID of a local gateway service,
853         # read data from the indicated service(s) instead of the usual
854         # list of local disk services.
855         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
856                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
857         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
858                            for hint in locator.hints if (
859                                    hint.startswith('K@') and
860                                    len(hint) == 29 and
861                                    self._gateway_services.get(hint[2:])
862                                    )])
863         # Map root URLs to their KeepService objects.
864         roots_map = {
865             root: self.KeepService(root, self._user_agent_pool)
866             for root in hint_roots
867         }
868
869         # See #3147 for a discussion of the loop implementation.  Highlights:
870         # * Refresh the list of Keep services after each failure, in case
871         #   it's being updated.
872         # * Retry until we succeed, we're out of retries, or every available
873         #   service has returned permanent failure.
874         sorted_roots = []
875         roots_map = {}
876         blob = None
877         loop = retry.RetryLoop(num_retries, self._check_loop_result,
878                                backoff_start=2)
879         for tries_left in loop:
880             try:
881                 sorted_roots = self.map_new_services(
882                     roots_map, locator,
883                     force_rebuild=(tries_left < num_retries),
884                     need_writable=False)
885             except Exception as error:
886                 loop.save_result(error)
887                 continue
888
889             # Query KeepService objects that haven't returned
890             # permanent failure, in our specified shuffle order.
891             services_to_try = [roots_map[root]
892                                for root in sorted_roots
893                                if roots_map[root].usable()]
894             for keep_service in services_to_try:
895                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
896                 if blob is not None:
897                     break
898             loop.save_result((blob, len(services_to_try)))
899
900         # Always cache the result, then return it if we succeeded.
901         slot.set(blob)
902         self.block_cache.cap_cache()
903         if loop.success():
904             return blob
905
906         # Q: Including 403 is necessary for the Keep tests to continue
907         # passing, but maybe they should expect KeepReadError instead?
908         not_founds = sum(1 for key in sorted_roots
909                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
910         service_errors = ((key, roots_map[key].last_result()['error'])
911                           for key in sorted_roots)
912         if not roots_map:
913             raise arvados.errors.KeepReadError(
914                 "failed to read {}: no Keep services available ({})".format(
915                     loc_s, loop.last_result()))
916         elif not_founds == len(sorted_roots):
917             raise arvados.errors.NotFoundError(
918                 "{} not found".format(loc_s), service_errors)
919         else:
920             raise arvados.errors.KeepReadError(
921                 "failed to read {}".format(loc_s), service_errors, label="service")
922
923     @retry.retry_method
924     def put(self, data, copies=2, num_retries=None):
925         """Save data in Keep.
926
927         This method will get a list of Keep services from the API server, and
928         send the data to each one simultaneously in a new thread.  Once the
929         uploads are finished, if enough copies are saved, this method returns
930         the most recent HTTP response body.  If requests fail to upload
931         enough copies, this method raises KeepWriteError.
932
933         Arguments:
934         * data: The string of data to upload.
935         * copies: The number of copies that the user requires be saved.
936           Default 2.
937         * num_retries: The number of times to retry PUT requests to
938           *each* Keep server if it returns temporary failures, with
939           exponential backoff.  The default value is set when the
940           KeepClient is initialized.
941         """
942
943         if isinstance(data, unicode):
944             data = data.encode("ascii")
945         elif not isinstance(data, str):
946             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
947
948         data_hash = hashlib.md5(data).hexdigest()
949         loc_s = data_hash + '+' + str(len(data))
950         if copies < 1:
951             return loc_s
952         locator = KeepLocator(loc_s)
953
954         headers = {}
955         # Tell the proxy how many copies we want it to store
956         headers['X-Keep-Desired-Replication'] = str(copies)
957         roots_map = {}
958         thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
959         loop = retry.RetryLoop(num_retries, self._check_loop_result,
960                                backoff_start=2)
961         thread_sequence = 0
962         for tries_left in loop:
963             try:
964                 sorted_roots = self.map_new_services(
965                     roots_map, locator,
966                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
967             except Exception as error:
968                 loop.save_result(error)
969                 continue
970
971             threads = []
972             for service_root, ks in [(root, roots_map[root])
973                                      for root in sorted_roots]:
974                 if ks.finished():
975                     continue
976                 t = KeepClient.KeepWriterThread(
977                     ks,
978                     data=data,
979                     data_hash=data_hash,
980                     service_root=service_root,
981                     thread_limiter=thread_limiter,
982                     timeout=self.current_timeout(num_retries-tries_left),
983                     thread_sequence=thread_sequence)
984                 t.start()
985                 threads.append(t)
986                 thread_sequence += 1
987             for t in threads:
988                 t.join()
989             loop.save_result((thread_limiter.done() >= copies, len(threads)))
990
991         if loop.success():
992             return thread_limiter.response()
993         if not roots_map:
994             raise arvados.errors.KeepWriteError(
995                 "failed to write {}: no Keep services available ({})".format(
996                     data_hash, loop.last_result()))
997         else:
998             service_errors = ((key, roots_map[key].last_result()['error'])
999                               for key in sorted_roots
1000                               if roots_map[key].last_result()['error'])
1001             raise arvados.errors.KeepWriteError(
1002                 "failed to write {} (wanted {} copies but wrote {})".format(
1003                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1004
1005     def local_store_put(self, data, copies=1, num_retries=None):
1006         """A stub for put().
1007
1008         This method is used in place of the real put() method when
1009         using local storage (see constructor's local_store argument).
1010
1011         copies and num_retries arguments are ignored: they are here
1012         only for the sake of offering the same call signature as
1013         put().
1014
1015         Data stored this way can be retrieved via local_store_get().
1016         """
1017         md5 = hashlib.md5(data).hexdigest()
1018         locator = '%s+%d' % (md5, len(data))
1019         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1020             f.write(data)
1021         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1022                   os.path.join(self.local_store, md5))
1023         return locator
1024
1025     def local_store_get(self, loc_s, num_retries=None):
1026         """Companion to local_store_put()."""
1027         try:
1028             locator = KeepLocator(loc_s)
1029         except ValueError:
1030             raise arvados.errors.NotFoundError(
1031                 "Invalid data locator: '%s'" % loc_s)
1032         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1033             return ''
1034         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1035             return f.read()
1036
1037     def is_cached(self, locator):
1038         return self.block_cache.reserve_cache(expect_hash)