b3ccc3eb6315c47e03b73af36075e1a6f0e6b355
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import os
6 import pycurl
7 import Queue
8 import re
9 import socket
10 import ssl
11 import threading
12 import timer
13
14 import arvados
15 import arvados.config as config
16 import arvados.errors
17 import arvados.retry as retry
18 import arvados.util
19
20 _logger = logging.getLogger('arvados.keep')
21 global_client_object = None
22
23
24 class KeepLocator(object):
25     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
26     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
27
28     def __init__(self, locator_str):
29         self.hints = []
30         self._perm_sig = None
31         self._perm_expiry = None
32         pieces = iter(locator_str.split('+'))
33         self.md5sum = next(pieces)
34         try:
35             self.size = int(next(pieces))
36         except StopIteration:
37             self.size = None
38         for hint in pieces:
39             if self.HINT_RE.match(hint) is None:
40                 raise ValueError("invalid hint format: {}".format(hint))
41             elif hint.startswith('A'):
42                 self.parse_permission_hint(hint)
43             else:
44                 self.hints.append(hint)
45
46     def __str__(self):
47         return '+'.join(
48             str(s) for s in [self.md5sum, self.size,
49                              self.permission_hint()] + self.hints
50             if s is not None)
51
52     def stripped(self):
53         if self.size is not None:
54             return "%s+%i" % (self.md5sum, self.size)
55         else:
56             return self.md5sum
57
58     def _make_hex_prop(name, length):
59         # Build and return a new property with the given name that
60         # must be a hex string of the given length.
61         data_name = '_{}'.format(name)
62         def getter(self):
63             return getattr(self, data_name)
64         def setter(self, hex_str):
65             if not arvados.util.is_hex(hex_str, length):
66                 raise ValueError("{} is not a {}-digit hex string: {}".
67                                  format(name, length, hex_str))
68             setattr(self, data_name, hex_str)
69         return property(getter, setter)
70
71     md5sum = _make_hex_prop('md5sum', 32)
72     perm_sig = _make_hex_prop('perm_sig', 40)
73
74     @property
75     def perm_expiry(self):
76         return self._perm_expiry
77
78     @perm_expiry.setter
79     def perm_expiry(self, value):
80         if not arvados.util.is_hex(value, 1, 8):
81             raise ValueError(
82                 "permission timestamp must be a hex Unix timestamp: {}".
83                 format(value))
84         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
85
86     def permission_hint(self):
87         data = [self.perm_sig, self.perm_expiry]
88         if None in data:
89             return None
90         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
91         return "A{}@{:08x}".format(*data)
92
93     def parse_permission_hint(self, s):
94         try:
95             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
96         except IndexError:
97             raise ValueError("bad permission hint {}".format(s))
98
99     def permission_expired(self, as_of_dt=None):
100         if self.perm_expiry is None:
101             return False
102         elif as_of_dt is None:
103             as_of_dt = datetime.datetime.now()
104         return self.perm_expiry <= as_of_dt
105
106
107 class Keep(object):
108     """Simple interface to a global KeepClient object.
109
110     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
111     own API client.  The global KeepClient will build an API client from the
112     current Arvados configuration, which may not match the one you built.
113     """
114     _last_key = None
115
116     @classmethod
117     def global_client_object(cls):
118         global global_client_object
119         # Previously, KeepClient would change its behavior at runtime based
120         # on these configuration settings.  We simulate that behavior here
121         # by checking the values and returning a new KeepClient if any of
122         # them have changed.
123         key = (config.get('ARVADOS_API_HOST'),
124                config.get('ARVADOS_API_TOKEN'),
125                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
126                config.get('ARVADOS_KEEP_PROXY'),
127                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
128                os.environ.get('KEEP_LOCAL_STORE'))
129         if (global_client_object is None) or (cls._last_key != key):
130             global_client_object = KeepClient()
131             cls._last_key = key
132         return global_client_object
133
134     @staticmethod
135     def get(locator, **kwargs):
136         return Keep.global_client_object().get(locator, **kwargs)
137
138     @staticmethod
139     def put(data, **kwargs):
140         return Keep.global_client_object().put(data, **kwargs)
141
142 class KeepBlockCache(object):
143     # Default RAM cache is 256MiB
144     def __init__(self, cache_max=(256 * 1024 * 1024)):
145         self.cache_max = cache_max
146         self._cache = []
147         self._cache_lock = threading.Lock()
148
149     class CacheSlot(object):
150         def __init__(self, locator):
151             self.locator = locator
152             self.ready = threading.Event()
153             self.content = None
154
155         def get(self):
156             self.ready.wait()
157             return self.content
158
159         def set(self, value):
160             self.content = value
161             self.ready.set()
162
163         def size(self):
164             if self.content is None:
165                 return 0
166             else:
167                 return len(self.content)
168
169     def cap_cache(self):
170         '''Cap the cache size to self.cache_max'''
171         with self._cache_lock:
172             # Select all slots except those where ready.is_set() and content is
173             # None (that means there was an error reading the block).
174             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
175             sm = sum([slot.size() for slot in self._cache])
176             while len(self._cache) > 0 and sm > self.cache_max:
177                 for i in xrange(len(self._cache)-1, -1, -1):
178                     if self._cache[i].ready.is_set():
179                         del self._cache[i]
180                         break
181                 sm = sum([slot.size() for slot in self._cache])
182
183     def _get(self, locator):
184         # Test if the locator is already in the cache
185         for i in xrange(0, len(self._cache)):
186             if self._cache[i].locator == locator:
187                 n = self._cache[i]
188                 if i != 0:
189                     # move it to the front
190                     del self._cache[i]
191                     self._cache.insert(0, n)
192                 return n
193         return None
194
195     def get(self, locator):
196         with self._cache_lock:
197             return self._get(locator)
198
199     def reserve_cache(self, locator):
200         '''Reserve a cache slot for the specified locator,
201         or return the existing slot.'''
202         with self._cache_lock:
203             n = self._get(locator)
204             if n:
205                 return n, False
206             else:
207                 # Add a new cache slot for the locator
208                 n = KeepBlockCache.CacheSlot(locator)
209                 self._cache.insert(0, n)
210                 return n, True
211
212 class KeepClient(object):
213
214     # Default Keep server connection timeout:  2 seconds
215     # Default Keep server read timeout:      300 seconds
216     # Default Keep proxy connection timeout:  20 seconds
217     # Default Keep proxy read timeout:       300 seconds
218     DEFAULT_TIMEOUT = (2, 300)
219     DEFAULT_PROXY_TIMEOUT = (20, 300)
220
221     class ThreadLimiter(object):
222         """
223         Limit the number of threads running at a given time to
224         {desired successes} minus {successes reported}. When successes
225         reported == desired, wake up the remaining threads and tell
226         them to quit.
227
228         Should be used in a "with" block.
229         """
230         def __init__(self, todo):
231             self._started = 0
232             self._todo = todo
233             self._done = 0
234             self._response = None
235             self._start_lock = threading.Condition()
236             self._todo_lock = threading.Semaphore(todo)
237             self._done_lock = threading.Lock()
238             self._local = threading.local()
239
240         def __enter__(self):
241             self._start_lock.acquire()
242             if getattr(self._local, 'sequence', None) is not None:
243                 # If the calling thread has used set_sequence(N), then
244                 # we wait here until N other threads have started.
245                 while self._started < self._local.sequence:
246                     self._start_lock.wait()
247             self._todo_lock.acquire()
248             self._started += 1
249             self._start_lock.notifyAll()
250             self._start_lock.release()
251             return self
252
253         def __exit__(self, type, value, traceback):
254             self._todo_lock.release()
255
256         def set_sequence(self, sequence):
257             self._local.sequence = sequence
258
259         def shall_i_proceed(self):
260             """
261             Return true if the current thread should do stuff. Return
262             false if the current thread should just stop.
263             """
264             with self._done_lock:
265                 return (self._done < self._todo)
266
267         def save_response(self, response_body, replicas_stored):
268             """
269             Records a response body (a locator, possibly signed) returned by
270             the Keep server.  It is not necessary to save more than
271             one response, since we presume that any locator returned
272             in response to a successful request is valid.
273             """
274             with self._done_lock:
275                 self._done += replicas_stored
276                 self._response = response_body
277
278         def response(self):
279             """
280             Returns the body from the response to a PUT request.
281             """
282             with self._done_lock:
283                 return self._response
284
285         def done(self):
286             """
287             Return how many successes were reported.
288             """
289             with self._done_lock:
290                 return self._done
291
292
293     class KeepService(object):
294         """Make requests to a single Keep service, and track results.
295
296         A KeepService is intended to last long enough to perform one
297         transaction (GET or PUT) against one Keep service. This can
298         involve calling either get() or put() multiple times in order
299         to retry after transient failures. However, calling both get()
300         and put() on a single instance -- or using the same instance
301         to access two different Keep services -- will not produce
302         sensible behavior.
303         """
304
305         HTTP_ERRORS = (
306             socket.error,
307             ssl.SSLError,
308             arvados.errors.HttpError,
309         )
310
311         def __init__(self, root, user_agent_pool=Queue.LifoQueue(), **headers):
312             self.root = root
313             self._user_agent_pool = user_agent_pool
314             self._result = {'error': None}
315             self._usable = True
316             self._session = None
317             self.get_headers = {'Accept': 'application/octet-stream'}
318             self.get_headers.update(headers)
319             self.put_headers = headers
320
321         def usable(self):
322             """Is it worth attempting a request?"""
323             return self._usable
324
325         def finished(self):
326             """Did the request succeed or encounter permanent failure?"""
327             return self._result['error'] == False or not self._usable
328
329         def last_result(self):
330             return self._result
331
332         def _get_user_agent(self):
333             try:
334                 return self._user_agent_pool.get(False)
335             except Queue.Empty:
336                 return pycurl.Curl()
337
338         def _put_user_agent(self, ua):
339             try:
340                 ua.reset()
341                 self._user_agent_pool.put(ua, False)
342             except:
343                 ua.close()
344
345         @staticmethod
346         def _socket_open(family, socktype, protocol, address=None):
347             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
348             s = socket.socket(family, socktype, protocol)
349             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
350             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
351             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
352             return s
353
354         def get(self, locator, timeout=None):
355             # locator is a KeepLocator object.
356             url = self.root + str(locator)
357             _logger.debug("Request: GET %s", url)
358             curl = self._get_user_agent()
359             try:
360                 with timer.Timer() as t:
361                     self._headers = {}
362                     response_body = cStringIO.StringIO()
363                     curl.setopt(pycurl.NOSIGNAL, 1)
364                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
365                     curl.setopt(pycurl.URL, url.encode('utf-8'))
366                     curl.setopt(pycurl.HTTPHEADER, [
367                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
368                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
369                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
370                     self._setcurltimeouts(curl, timeout)
371                     try:
372                         curl.perform()
373                     except Exception as e:
374                         raise arvados.errors.HttpError(0, str(e))
375                     self._result = {
376                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
377                         'body': response_body.getvalue(),
378                         'headers': self._headers,
379                         'error': False,
380                     }
381                 ok = retry.check_http_response_success(self._result['status_code'])
382                 if not ok:
383                     self._result['error'] = arvados.errors.HttpError(
384                         self._result['status_code'],
385                         self._headers.get('x-status-line', 'Error'))
386             except self.HTTP_ERRORS as e:
387                 self._result = {
388                     'error': e,
389                 }
390                 ok = False
391             self._usable = ok != False
392             if self._result.get('status_code', None):
393                 # The client worked well enough to get an HTTP status
394                 # code, so presumably any problems are just on the
395                 # server side and it's OK to reuse the client.
396                 self._put_user_agent(curl)
397             else:
398                 # Don't return this client to the pool, in case it's
399                 # broken.
400                 curl.close()
401             if not ok:
402                 _logger.debug("Request fail: GET %s => %s: %s",
403                               url, type(self._result['error']), str(self._result['error']))
404                 return None
405             _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
406                          self._result['status_code'],
407                          len(self._result['body']),
408                          t.msecs,
409                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
410             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
411             if resp_md5 != locator.md5sum:
412                 _logger.warning("Checksum fail: md5(%s) = %s",
413                                 url, resp_md5)
414                 self._result['error'] = arvados.errors.HttpError(
415                     0, 'Checksum fail')
416                 return None
417             return self._result['body']
418
419         def put(self, hash_s, body, timeout=None):
420             url = self.root + hash_s
421             _logger.debug("Request: PUT %s", url)
422             curl = self._get_user_agent()
423             try:
424                 self._headers = {}
425                 body_reader = cStringIO.StringIO(body)
426                 response_body = cStringIO.StringIO()
427                 curl.setopt(pycurl.NOSIGNAL, 1)
428                 curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
429                 curl.setopt(pycurl.URL, url.encode('utf-8'))
430                 # Using UPLOAD tells cURL to wait for a "go ahead" from the
431                 # Keep server (in the form of a HTTP/1.1 "100 Continue"
432                 # response) instead of sending the request body immediately.
433                 # This allows the server to reject the request if the request
434                 # is invalid or the server is read-only, without waiting for
435                 # the client to send the entire block.
436                 curl.setopt(pycurl.UPLOAD, True)
437                 curl.setopt(pycurl.INFILESIZE, len(body))
438                 curl.setopt(pycurl.READFUNCTION, body_reader.read)
439                 curl.setopt(pycurl.HTTPHEADER, [
440                     '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
441                 curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
442                 curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
443                 self._setcurltimeouts(curl, timeout)
444                 try:
445                     curl.perform()
446                 except Exception as e:
447                     raise arvados.errors.HttpError(0, str(e))
448                 self._result = {
449                     'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
450                     'body': response_body.getvalue(),
451                     'headers': self._headers,
452                     'error': False,
453                 }
454                 ok = retry.check_http_response_success(self._result['status_code'])
455                 if not ok:
456                     self._result['error'] = arvados.errors.HttpError(
457                         self._result['status_code'],
458                         self._headers.get('x-status-line', 'Error'))
459             except self.HTTP_ERRORS as e:
460                 self._result = {
461                     'error': e,
462                 }
463                 ok = False
464             self._usable = ok != False # still usable if ok is True or None
465             if self._result.get('status_code', None):
466                 # Client is functional. See comment in get().
467                 self._put_user_agent(curl)
468             else:
469                 curl.close()
470             if not ok:
471                 _logger.debug("Request fail: PUT %s => %s: %s",
472                               url, type(self._result['error']), str(self._result['error']))
473                 return False
474             return True
475
476         def _setcurltimeouts(self, curl, timeouts):
477             if not timeouts:
478                 return
479             elif isinstance(timeouts, tuple):
480                 conn_t, xfer_t = timeouts
481             else:
482                 conn_t, xfer_t = (timeouts, timeouts)
483             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
484             curl.setopt(pycurl.TIMEOUT_MS, int(xfer_t*1000))
485
486         def _headerfunction(self, header_line):
487             header_line = header_line.decode('iso-8859-1')
488             if ':' in header_line:
489                 name, value = header_line.split(':', 1)
490                 name = name.strip().lower()
491                 value = value.strip()
492             elif self._headers:
493                 name = self._lastheadername
494                 value = self._headers[name] + ' ' + header_line.strip()
495             elif header_line.startswith('HTTP/'):
496                 name = 'x-status-line'
497                 value = header_line
498             else:
499                 _logger.error("Unexpected header line: %s", header_line)
500                 return
501             self._lastheadername = name
502             self._headers[name] = value
503             # Returning None implies all bytes were written
504
505
506     class KeepWriterThread(threading.Thread):
507         """
508         Write a blob of data to the given Keep server. On success, call
509         save_response() of the given ThreadLimiter to save the returned
510         locator.
511         """
512         def __init__(self, keep_service, **kwargs):
513             super(KeepClient.KeepWriterThread, self).__init__()
514             self.service = keep_service
515             self.args = kwargs
516             self._success = False
517
518         def success(self):
519             return self._success
520
521         def run(self):
522             limiter = self.args['thread_limiter']
523             sequence = self.args['thread_sequence']
524             if sequence is not None:
525                 limiter.set_sequence(sequence)
526             with limiter:
527                 if not limiter.shall_i_proceed():
528                     # My turn arrived, but the job has been done without
529                     # me.
530                     return
531                 self.run_with_limiter(limiter)
532
533         def run_with_limiter(self, limiter):
534             if self.service.finished():
535                 return
536             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
537                           str(threading.current_thread()),
538                           self.args['data_hash'],
539                           len(self.args['data']),
540                           self.args['service_root'])
541             self._success = bool(self.service.put(
542                 self.args['data_hash'],
543                 self.args['data'],
544                 timeout=self.args.get('timeout', None)))
545             result = self.service.last_result()
546             if self._success:
547                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
548                               str(threading.current_thread()),
549                               self.args['data_hash'],
550                               len(self.args['data']),
551                               self.args['service_root'])
552                 # Tick the 'done' counter for the number of replica
553                 # reported stored by the server, for the case that
554                 # we're talking to a proxy or other backend that
555                 # stores to multiple copies for us.
556                 try:
557                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
558                 except (KeyError, ValueError):
559                     replicas_stored = 1
560                 limiter.save_response(result['body'].strip(), replicas_stored)
561             elif result.get('status_code', None):
562                 _logger.debug("Request fail: PUT %s => %s %s",
563                               self.args['data_hash'],
564                               result['status_code'],
565                               result['body'])
566
567
568     def __init__(self, api_client=None, proxy=None,
569                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
570                  api_token=None, local_store=None, block_cache=None,
571                  num_retries=0, session=None):
572         """Initialize a new KeepClient.
573
574         Arguments:
575         :api_client:
576           The API client to use to find Keep services.  If not
577           provided, KeepClient will build one from available Arvados
578           configuration.
579
580         :proxy:
581           If specified, this KeepClient will send requests to this Keep
582           proxy.  Otherwise, KeepClient will fall back to the setting of the
583           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
584           KeepClient does not use a proxy, pass in an empty string.
585
586         :timeout:
587           The initial timeout (in seconds) for HTTP requests to Keep
588           non-proxy servers.  A tuple of two floats is interpreted as
589           (connection_timeout, read_timeout): see
590           http://docs.python-requests.org/en/latest/user/advanced/#timeouts.
591           Because timeouts are often a result of transient server load, the
592           actual connection timeout will be increased by a factor of two on
593           each retry.
594           Default: (2, 300).
595
596         :proxy_timeout:
597           The initial timeout (in seconds) for HTTP requests to
598           Keep proxies. A tuple of two floats is interpreted as
599           (connection_timeout, read_timeout). The behavior described
600           above for adjusting connection timeouts on retry also applies.
601           Default: (20, 300).
602
603         :api_token:
604           If you're not using an API client, but only talking
605           directly to a Keep proxy, this parameter specifies an API token
606           to authenticate Keep requests.  It is an error to specify both
607           api_client and api_token.  If you specify neither, KeepClient
608           will use one available from the Arvados configuration.
609
610         :local_store:
611           If specified, this KeepClient will bypass Keep
612           services, and save data to the named directory.  If unspecified,
613           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
614           environment variable.  If you want to ensure KeepClient does not
615           use local storage, pass in an empty string.  This is primarily
616           intended to mock a server for testing.
617
618         :num_retries:
619           The default number of times to retry failed requests.
620           This will be used as the default num_retries value when get() and
621           put() are called.  Default 0.
622         """
623         self.lock = threading.Lock()
624         if proxy is None:
625             proxy = config.get('ARVADOS_KEEP_PROXY')
626         if api_token is None:
627             if api_client is None:
628                 api_token = config.get('ARVADOS_API_TOKEN')
629             else:
630                 api_token = api_client.api_token
631         elif api_client is not None:
632             raise ValueError(
633                 "can't build KeepClient with both API client and token")
634         if local_store is None:
635             local_store = os.environ.get('KEEP_LOCAL_STORE')
636
637         self.block_cache = block_cache if block_cache else KeepBlockCache()
638         self.timeout = timeout
639         self.proxy_timeout = proxy_timeout
640         self._user_agent_pool = Queue.LifoQueue()
641
642         if local_store:
643             self.local_store = local_store
644             self.get = self.local_store_get
645             self.put = self.local_store_put
646         else:
647             self.num_retries = num_retries
648             if proxy:
649                 if not proxy.endswith('/'):
650                     proxy += '/'
651                 self.api_token = api_token
652                 self._gateway_services = {}
653                 self._keep_services = [{
654                     'uuid': 'proxy',
655                     '_service_root': proxy,
656                     }]
657                 self._writable_services = self._keep_services
658                 self.using_proxy = True
659                 self._static_services_list = True
660                 self.max_replicas_per_service = 1
661             else:
662                 # It's important to avoid instantiating an API client
663                 # unless we actually need one, for testing's sake.
664                 if api_client is None:
665                     api_client = arvados.api('v1')
666                 self.api_client = api_client
667                 self.api_token = api_client.api_token
668                 self._gateway_services = {}
669                 self._keep_services = None
670                 self._writable_services = None
671                 self.using_proxy = None
672                 self._static_services_list = False
673                 self.max_replicas_per_service = 1
674
675     def current_timeout(self, attempt_number):
676         """Return the appropriate timeout to use for this client.
677
678         The proxy timeout setting if the backend service is currently a proxy,
679         the regular timeout setting otherwise.  The `attempt_number` indicates
680         how many times the operation has been tried already (starting from 0
681         for the first try), and scales the connection timeout portion of the
682         return value accordingly.
683
684         """
685         # TODO(twp): the timeout should be a property of a
686         # KeepService, not a KeepClient. See #4488.
687         t = self.proxy_timeout if self.using_proxy else self.timeout
688         return (t[0] * (1 << attempt_number), t[1])
689
690     def build_services_list(self, force_rebuild=False):
691         if (self._static_services_list or
692               (self._keep_services and not force_rebuild)):
693             return
694         with self.lock:
695             try:
696                 keep_services = self.api_client.keep_services().accessible()
697             except Exception:  # API server predates Keep services.
698                 keep_services = self.api_client.keep_disks().list()
699
700             accessible = keep_services.execute().get('items')
701             if not accessible:
702                 raise arvados.errors.NoKeepServersError()
703
704             # Precompute the base URI for each service.
705             for r in accessible:
706                 host = r['service_host']
707                 if not host.startswith('[') and host.find(':') >= 0:
708                     # IPv6 URIs must be formatted like http://[::1]:80/...
709                     host = '[' + host + ']'
710                 r['_service_root'] = "{}://{}:{:d}/".format(
711                     'https' if r['service_ssl_flag'] else 'http',
712                     host,
713                     r['service_port'])
714
715             # Gateway services are only used when specified by UUID,
716             # so there's nothing to gain by filtering them by
717             # service_type.
718             self._gateway_services = {ks.get('uuid'): ks for ks in accessible}
719             _logger.debug(str(self._gateway_services))
720
721             self._keep_services = [
722                 ks for ks in accessible
723                 if ks.get('service_type') in ['disk', 'proxy']]
724             self._writable_services = [
725                 ks for ks in accessible
726                 if (ks.get('service_type') in ['disk', 'proxy']) and (True != ks.get('read_only'))]
727             _logger.debug(str(self._keep_services))
728
729             self.using_proxy = any(ks.get('service_type') == 'proxy'
730                                    for ks in self._keep_services)
731             # For disk type services, max_replicas_per_service is 1
732             # It is unknown or unlimited for non-disk typed services.
733             for ks in accessible:
734                 if ('disk' != ks.get('service_type')) and (not ks.get('read_only')):
735                     self.max_replicas_per_service = None
736
737     def _service_weight(self, data_hash, service_uuid):
738         """Compute the weight of a Keep service endpoint for a data
739         block with a known hash.
740
741         The weight is md5(h + u) where u is the last 15 characters of
742         the service endpoint's UUID.
743         """
744         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
745
746     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
747         """Return an array of Keep service endpoints, in the order in
748         which they should be probed when reading or writing data with
749         the given hash+hints.
750         """
751         self.build_services_list(force_rebuild)
752
753         sorted_roots = []
754         # Use the services indicated by the given +K@... remote
755         # service hints, if any are present and can be resolved to a
756         # URI.
757         for hint in locator.hints:
758             if hint.startswith('K@'):
759                 if len(hint) == 7:
760                     sorted_roots.append(
761                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
762                 elif len(hint) == 29:
763                     svc = self._gateway_services.get(hint[2:])
764                     if svc:
765                         sorted_roots.append(svc['_service_root'])
766
767         # Sort the available local services by weight (heaviest first)
768         # for this locator, and return their service_roots (base URIs)
769         # in that order.
770         use_services = self._keep_services
771         if need_writable:
772           use_services = self._writable_services
773         sorted_roots.extend([
774             svc['_service_root'] for svc in sorted(
775                 use_services,
776                 reverse=True,
777                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
778         _logger.debug("{}: {}".format(locator, sorted_roots))
779         return sorted_roots
780
781     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
782         # roots_map is a dictionary, mapping Keep service root strings
783         # to KeepService objects.  Poll for Keep services, and add any
784         # new ones to roots_map.  Return the current list of local
785         # root strings.
786         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
787         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
788         for root in local_roots:
789             if root not in roots_map:
790                 roots_map[root] = self.KeepService(
791                     root, self._user_agent_pool, **headers)
792         return local_roots
793
794     @staticmethod
795     def _check_loop_result(result):
796         # KeepClient RetryLoops should save results as a 2-tuple: the
797         # actual result of the request, and the number of servers available
798         # to receive the request this round.
799         # This method returns True if there's a real result, False if
800         # there are no more servers available, otherwise None.
801         if isinstance(result, Exception):
802             return None
803         result, tried_server_count = result
804         if (result is not None) and (result is not False):
805             return True
806         elif tried_server_count < 1:
807             _logger.info("No more Keep services to try; giving up")
808             return False
809         else:
810             return None
811
812     def get_from_cache(self, loc):
813         """Fetch a block only if is in the cache, otherwise return None."""
814         slot = self.block_cache.get(loc)
815         if slot is not None and slot.ready.is_set():
816             return slot.get()
817         else:
818             return None
819
820     @retry.retry_method
821     def get(self, loc_s, num_retries=None):
822         """Get data from Keep.
823
824         This method fetches one or more blocks of data from Keep.  It
825         sends a request each Keep service registered with the API
826         server (or the proxy provided when this client was
827         instantiated), then each service named in location hints, in
828         sequence.  As soon as one service provides the data, it's
829         returned.
830
831         Arguments:
832         * loc_s: A string of one or more comma-separated locators to fetch.
833           This method returns the concatenation of these blocks.
834         * num_retries: The number of times to retry GET requests to
835           *each* Keep server if it returns temporary failures, with
836           exponential backoff.  Note that, in each loop, the method may try
837           to fetch data from every available Keep service, along with any
838           that are named in location hints in the locator.  The default value
839           is set when the KeepClient is initialized.
840         """
841         if ',' in loc_s:
842             return ''.join(self.get(x) for x in loc_s.split(','))
843         locator = KeepLocator(loc_s)
844         slot, first = self.block_cache.reserve_cache(locator.md5sum)
845         if not first:
846             v = slot.get()
847             return v
848
849         # If the locator has hints specifying a prefix (indicating a
850         # remote keepproxy) or the UUID of a local gateway service,
851         # read data from the indicated service(s) instead of the usual
852         # list of local disk services.
853         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
854                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
855         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
856                            for hint in locator.hints if (
857                                    hint.startswith('K@') and
858                                    len(hint) == 29 and
859                                    self._gateway_services.get(hint[2:])
860                                    )])
861         # Map root URLs to their KeepService objects.
862         roots_map = {
863             root: self.KeepService(root, self._user_agent_pool)
864             for root in hint_roots
865         }
866
867         # See #3147 for a discussion of the loop implementation.  Highlights:
868         # * Refresh the list of Keep services after each failure, in case
869         #   it's being updated.
870         # * Retry until we succeed, we're out of retries, or every available
871         #   service has returned permanent failure.
872         sorted_roots = []
873         roots_map = {}
874         blob = None
875         loop = retry.RetryLoop(num_retries, self._check_loop_result,
876                                backoff_start=2)
877         for tries_left in loop:
878             try:
879                 sorted_roots = self.map_new_services(
880                     roots_map, locator,
881                     force_rebuild=(tries_left < num_retries),
882                     need_writable=False)
883             except Exception as error:
884                 loop.save_result(error)
885                 continue
886
887             # Query KeepService objects that haven't returned
888             # permanent failure, in our specified shuffle order.
889             services_to_try = [roots_map[root]
890                                for root in sorted_roots
891                                if roots_map[root].usable()]
892             for keep_service in services_to_try:
893                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
894                 if blob is not None:
895                     break
896             loop.save_result((blob, len(services_to_try)))
897
898         # Always cache the result, then return it if we succeeded.
899         slot.set(blob)
900         self.block_cache.cap_cache()
901         if loop.success():
902             return blob
903
904         # Q: Including 403 is necessary for the Keep tests to continue
905         # passing, but maybe they should expect KeepReadError instead?
906         not_founds = sum(1 for key in sorted_roots
907                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
908         service_errors = ((key, roots_map[key].last_result()['error'])
909                           for key in sorted_roots)
910         if not roots_map:
911             raise arvados.errors.KeepReadError(
912                 "failed to read {}: no Keep services available ({})".format(
913                     loc_s, loop.last_result()))
914         elif not_founds == len(sorted_roots):
915             raise arvados.errors.NotFoundError(
916                 "{} not found".format(loc_s), service_errors)
917         else:
918             raise arvados.errors.KeepReadError(
919                 "failed to read {}".format(loc_s), service_errors, label="service")
920
921     @retry.retry_method
922     def put(self, data, copies=2, num_retries=None):
923         """Save data in Keep.
924
925         This method will get a list of Keep services from the API server, and
926         send the data to each one simultaneously in a new thread.  Once the
927         uploads are finished, if enough copies are saved, this method returns
928         the most recent HTTP response body.  If requests fail to upload
929         enough copies, this method raises KeepWriteError.
930
931         Arguments:
932         * data: The string of data to upload.
933         * copies: The number of copies that the user requires be saved.
934           Default 2.
935         * num_retries: The number of times to retry PUT requests to
936           *each* Keep server if it returns temporary failures, with
937           exponential backoff.  The default value is set when the
938           KeepClient is initialized.
939         """
940
941         if isinstance(data, unicode):
942             data = data.encode("ascii")
943         elif not isinstance(data, str):
944             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
945
946         data_hash = hashlib.md5(data).hexdigest()
947         loc_s = data_hash + '+' + str(len(data))
948         if copies < 1:
949             return loc_s
950         locator = KeepLocator(loc_s)
951
952         headers = {}
953         # Tell the proxy how many copies we want it to store
954         headers['X-Keep-Desired-Replication'] = str(copies)
955         roots_map = {}
956         thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
957         loop = retry.RetryLoop(num_retries, self._check_loop_result,
958                                backoff_start=2)
959         thread_sequence = 0
960         for tries_left in loop:
961             try:
962                 sorted_roots = self.map_new_services(
963                     roots_map, locator,
964                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
965             except Exception as error:
966                 loop.save_result(error)
967                 continue
968
969             threads = []
970             for service_root, ks in [(root, roots_map[root])
971                                      for root in sorted_roots]:
972                 if ks.finished():
973                     continue
974                 t = KeepClient.KeepWriterThread(
975                     ks,
976                     data=data,
977                     data_hash=data_hash,
978                     service_root=service_root,
979                     thread_limiter=thread_limiter,
980                     timeout=self.current_timeout(num_retries-tries_left),
981                     thread_sequence=thread_sequence)
982                 t.start()
983                 threads.append(t)
984                 thread_sequence += 1
985             for t in threads:
986                 t.join()
987             loop.save_result((thread_limiter.done() >= copies, len(threads)))
988
989         if loop.success():
990             return thread_limiter.response()
991         if not roots_map:
992             raise arvados.errors.KeepWriteError(
993                 "failed to write {}: no Keep services available ({})".format(
994                     data_hash, loop.last_result()))
995         else:
996             service_errors = ((key, roots_map[key].last_result()['error'])
997                               for key in sorted_roots
998                               if roots_map[key].last_result()['error'])
999             raise arvados.errors.KeepWriteError(
1000                 "failed to write {} (wanted {} copies but wrote {})".format(
1001                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1002
1003     def local_store_put(self, data, copies=1, num_retries=None):
1004         """A stub for put().
1005
1006         This method is used in place of the real put() method when
1007         using local storage (see constructor's local_store argument).
1008
1009         copies and num_retries arguments are ignored: they are here
1010         only for the sake of offering the same call signature as
1011         put().
1012
1013         Data stored this way can be retrieved via local_store_get().
1014         """
1015         md5 = hashlib.md5(data).hexdigest()
1016         locator = '%s+%d' % (md5, len(data))
1017         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1018             f.write(data)
1019         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1020                   os.path.join(self.local_store, md5))
1021         return locator
1022
1023     def local_store_get(self, loc_s, num_retries=None):
1024         """Companion to local_store_put()."""
1025         try:
1026             locator = KeepLocator(loc_s)
1027         except ValueError:
1028             raise arvados.errors.NotFoundError(
1029                 "Invalid data locator: '%s'" % loc_s)
1030         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1031             return ''
1032         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1033             return f.read()
1034
1035     def is_cached(self, locator):
1036         return self.block_cache.reserve_cache(expect_hash)