Merge branch '6605-arv-copy-http' closes #6605
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import threading
13 import timer
14
15 import arvados
16 import arvados.config as config
17 import arvados.errors
18 import arvados.retry as retry
19 import arvados.util
20
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
23
24
25 class KeepLocator(object):
26     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28
29     def __init__(self, locator_str):
30         self.hints = []
31         self._perm_sig = None
32         self._perm_expiry = None
33         pieces = iter(locator_str.split('+'))
34         self.md5sum = next(pieces)
35         try:
36             self.size = int(next(pieces))
37         except StopIteration:
38             self.size = None
39         for hint in pieces:
40             if self.HINT_RE.match(hint) is None:
41                 raise ValueError("invalid hint format: {}".format(hint))
42             elif hint.startswith('A'):
43                 self.parse_permission_hint(hint)
44             else:
45                 self.hints.append(hint)
46
47     def __str__(self):
48         return '+'.join(
49             str(s) for s in [self.md5sum, self.size,
50                              self.permission_hint()] + self.hints
51             if s is not None)
52
53     def stripped(self):
54         if self.size is not None:
55             return "%s+%i" % (self.md5sum, self.size)
56         else:
57             return self.md5sum
58
59     def _make_hex_prop(name, length):
60         # Build and return a new property with the given name that
61         # must be a hex string of the given length.
62         data_name = '_{}'.format(name)
63         def getter(self):
64             return getattr(self, data_name)
65         def setter(self, hex_str):
66             if not arvados.util.is_hex(hex_str, length):
67                 raise ValueError("{} is not a {}-digit hex string: {}".
68                                  format(name, length, hex_str))
69             setattr(self, data_name, hex_str)
70         return property(getter, setter)
71
72     md5sum = _make_hex_prop('md5sum', 32)
73     perm_sig = _make_hex_prop('perm_sig', 40)
74
75     @property
76     def perm_expiry(self):
77         return self._perm_expiry
78
79     @perm_expiry.setter
80     def perm_expiry(self, value):
81         if not arvados.util.is_hex(value, 1, 8):
82             raise ValueError(
83                 "permission timestamp must be a hex Unix timestamp: {}".
84                 format(value))
85         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86
87     def permission_hint(self):
88         data = [self.perm_sig, self.perm_expiry]
89         if None in data:
90             return None
91         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92         return "A{}@{:08x}".format(*data)
93
94     def parse_permission_hint(self, s):
95         try:
96             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97         except IndexError:
98             raise ValueError("bad permission hint {}".format(s))
99
100     def permission_expired(self, as_of_dt=None):
101         if self.perm_expiry is None:
102             return False
103         elif as_of_dt is None:
104             as_of_dt = datetime.datetime.now()
105         return self.perm_expiry <= as_of_dt
106
107
108 class Keep(object):
109     """Simple interface to a global KeepClient object.
110
111     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
112     own API client.  The global KeepClient will build an API client from the
113     current Arvados configuration, which may not match the one you built.
114     """
115     _last_key = None
116
117     @classmethod
118     def global_client_object(cls):
119         global global_client_object
120         # Previously, KeepClient would change its behavior at runtime based
121         # on these configuration settings.  We simulate that behavior here
122         # by checking the values and returning a new KeepClient if any of
123         # them have changed.
124         key = (config.get('ARVADOS_API_HOST'),
125                config.get('ARVADOS_API_TOKEN'),
126                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127                config.get('ARVADOS_KEEP_PROXY'),
128                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129                os.environ.get('KEEP_LOCAL_STORE'))
130         if (global_client_object is None) or (cls._last_key != key):
131             global_client_object = KeepClient()
132             cls._last_key = key
133         return global_client_object
134
135     @staticmethod
136     def get(locator, **kwargs):
137         return Keep.global_client_object().get(locator, **kwargs)
138
139     @staticmethod
140     def put(data, **kwargs):
141         return Keep.global_client_object().put(data, **kwargs)
142
143 class KeepBlockCache(object):
144     # Default RAM cache is 256MiB
145     def __init__(self, cache_max=(256 * 1024 * 1024)):
146         self.cache_max = cache_max
147         self._cache = []
148         self._cache_lock = threading.Lock()
149
150     class CacheSlot(object):
151         __slots__ = ("locator", "ready", "content")
152
153         def __init__(self, locator):
154             self.locator = locator
155             self.ready = threading.Event()
156             self.content = None
157
158         def get(self):
159             self.ready.wait()
160             return self.content
161
162         def set(self, value):
163             self.content = value
164             self.ready.set()
165
166         def size(self):
167             if self.content is None:
168                 return 0
169             else:
170                 return len(self.content)
171
172     def cap_cache(self):
173         '''Cap the cache size to self.cache_max'''
174         with self._cache_lock:
175             # Select all slots except those where ready.is_set() and content is
176             # None (that means there was an error reading the block).
177             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178             sm = sum([slot.size() for slot in self._cache])
179             while len(self._cache) > 0 and sm > self.cache_max:
180                 for i in xrange(len(self._cache)-1, -1, -1):
181                     if self._cache[i].ready.is_set():
182                         del self._cache[i]
183                         break
184                 sm = sum([slot.size() for slot in self._cache])
185
186     def _get(self, locator):
187         # Test if the locator is already in the cache
188         for i in xrange(0, len(self._cache)):
189             if self._cache[i].locator == locator:
190                 n = self._cache[i]
191                 if i != 0:
192                     # move it to the front
193                     del self._cache[i]
194                     self._cache.insert(0, n)
195                 return n
196         return None
197
198     def get(self, locator):
199         with self._cache_lock:
200             return self._get(locator)
201
202     def reserve_cache(self, locator):
203         '''Reserve a cache slot for the specified locator,
204         or return the existing slot.'''
205         with self._cache_lock:
206             n = self._get(locator)
207             if n:
208                 return n, False
209             else:
210                 # Add a new cache slot for the locator
211                 n = KeepBlockCache.CacheSlot(locator)
212                 self._cache.insert(0, n)
213                 return n, True
214
215
216 class Counter(object):
217     def __init__(self, v=0):
218         self._lk = threading.Lock()
219         self._val = v
220
221     def add(self, v):
222         with self._lk:
223             self._val += v
224
225     def get(self):
226         with self._lk:
227             return self._val
228
229
230 class KeepClient(object):
231
232     # Default Keep server connection timeout:  2 seconds
233     # Default Keep server read timeout:       256 seconds
234     # Default Keep server bandwidth minimum:  32768 bytes per second
235     # Default Keep proxy connection timeout:  20 seconds
236     # Default Keep proxy read timeout:        256 seconds
237     # Default Keep proxy bandwidth minimum:   32768 bytes per second
238     DEFAULT_TIMEOUT = (2, 256, 32768)
239     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
240
241     class ThreadLimiter(object):
242         """Limit the number of threads writing to Keep at once.
243
244         This ensures that only a number of writer threads that could
245         potentially achieve the desired replication level run at once.
246         Once the desired replication level is achieved, queued threads
247         are instructed not to run.
248
249         Should be used in a "with" block.
250         """
251         def __init__(self, want_copies, max_service_replicas):
252             self._started = 0
253             self._want_copies = want_copies
254             self._done = 0
255             self._response = None
256             self._start_lock = threading.Condition()
257             if (not max_service_replicas) or (max_service_replicas >= want_copies):
258                 max_threads = 1
259             else:
260                 max_threads = math.ceil(float(want_copies) / max_service_replicas)
261             _logger.debug("Limiter max threads is %d", max_threads)
262             self._todo_lock = threading.Semaphore(max_threads)
263             self._done_lock = threading.Lock()
264             self._local = threading.local()
265
266         def __enter__(self):
267             self._start_lock.acquire()
268             if getattr(self._local, 'sequence', None) is not None:
269                 # If the calling thread has used set_sequence(N), then
270                 # we wait here until N other threads have started.
271                 while self._started < self._local.sequence:
272                     self._start_lock.wait()
273             self._todo_lock.acquire()
274             self._started += 1
275             self._start_lock.notifyAll()
276             self._start_lock.release()
277             return self
278
279         def __exit__(self, type, value, traceback):
280             self._todo_lock.release()
281
282         def set_sequence(self, sequence):
283             self._local.sequence = sequence
284
285         def shall_i_proceed(self):
286             """
287             Return true if the current thread should write to Keep.
288             Return false otherwise.
289             """
290             with self._done_lock:
291                 return (self._done < self._want_copies)
292
293         def save_response(self, response_body, replicas_stored):
294             """
295             Records a response body (a locator, possibly signed) returned by
296             the Keep server, and the number of replicas it stored.
297             """
298             with self._done_lock:
299                 self._done += replicas_stored
300                 self._response = response_body
301
302         def response(self):
303             """Return the body from the response to a PUT request."""
304             with self._done_lock:
305                 return self._response
306
307         def done(self):
308             """Return the total number of replicas successfully stored."""
309             with self._done_lock:
310                 return self._done
311
312     class KeepService(object):
313         """Make requests to a single Keep service, and track results.
314
315         A KeepService is intended to last long enough to perform one
316         transaction (GET or PUT) against one Keep service. This can
317         involve calling either get() or put() multiple times in order
318         to retry after transient failures. However, calling both get()
319         and put() on a single instance -- or using the same instance
320         to access two different Keep services -- will not produce
321         sensible behavior.
322         """
323
324         HTTP_ERRORS = (
325             socket.error,
326             ssl.SSLError,
327             arvados.errors.HttpError,
328         )
329
330         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
331                      upload_counter=None,
332                      download_counter=None, **headers):
333             self.root = root
334             self._user_agent_pool = user_agent_pool
335             self._result = {'error': None}
336             self._usable = True
337             self._session = None
338             self.get_headers = {'Accept': 'application/octet-stream'}
339             self.get_headers.update(headers)
340             self.put_headers = headers
341             self.upload_counter = upload_counter
342             self.download_counter = download_counter
343
344         def usable(self):
345             """Is it worth attempting a request?"""
346             return self._usable
347
348         def finished(self):
349             """Did the request succeed or encounter permanent failure?"""
350             return self._result['error'] == False or not self._usable
351
352         def last_result(self):
353             return self._result
354
355         def _get_user_agent(self):
356             try:
357                 return self._user_agent_pool.get(False)
358             except Queue.Empty:
359                 return pycurl.Curl()
360
361         def _put_user_agent(self, ua):
362             try:
363                 ua.reset()
364                 self._user_agent_pool.put(ua, False)
365             except:
366                 ua.close()
367
368         @staticmethod
369         def _socket_open(family, socktype, protocol, address=None):
370             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
371             s = socket.socket(family, socktype, protocol)
372             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
373             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
374             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
375             return s
376
377         def get(self, locator, timeout=None):
378             # locator is a KeepLocator object.
379             url = self.root + str(locator)
380             _logger.debug("Request: GET %s", url)
381             curl = self._get_user_agent()
382             ok = None
383             try:
384                 with timer.Timer() as t:
385                     self._headers = {}
386                     response_body = cStringIO.StringIO()
387                     curl.setopt(pycurl.NOSIGNAL, 1)
388                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
389                     curl.setopt(pycurl.URL, url.encode('utf-8'))
390                     curl.setopt(pycurl.HTTPHEADER, [
391                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
392                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
393                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
394                     self._setcurltimeouts(curl, timeout)
395                     try:
396                         curl.perform()
397                     except Exception as e:
398                         raise arvados.errors.HttpError(0, str(e))
399                     self._result = {
400                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
401                         'body': response_body.getvalue(),
402                         'headers': self._headers,
403                         'error': False,
404                     }
405                 ok = retry.check_http_response_success(self._result['status_code'])
406                 if not ok:
407                     self._result['error'] = arvados.errors.HttpError(
408                         self._result['status_code'],
409                         self._headers.get('x-status-line', 'Error'))
410             except self.HTTP_ERRORS as e:
411                 self._result = {
412                     'error': e,
413                 }
414             self._usable = ok != False
415             if self._result.get('status_code', None):
416                 # The client worked well enough to get an HTTP status
417                 # code, so presumably any problems are just on the
418                 # server side and it's OK to reuse the client.
419                 self._put_user_agent(curl)
420             else:
421                 # Don't return this client to the pool, in case it's
422                 # broken.
423                 curl.close()
424             if not ok:
425                 _logger.debug("Request fail: GET %s => %s: %s",
426                               url, type(self._result['error']), str(self._result['error']))
427                 return None
428             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
429                          self._result['status_code'],
430                          len(self._result['body']),
431                          t.msecs,
432                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
433             if self.download_counter:
434                 self.download_counter.add(len(self._result['body']))
435             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
436             if resp_md5 != locator.md5sum:
437                 _logger.warning("Checksum fail: md5(%s) = %s",
438                                 url, resp_md5)
439                 self._result['error'] = arvados.errors.HttpError(
440                     0, 'Checksum fail')
441                 return None
442             return self._result['body']
443
444         def put(self, hash_s, body, timeout=None):
445             url = self.root + hash_s
446             _logger.debug("Request: PUT %s", url)
447             curl = self._get_user_agent()
448             ok = None
449             try:
450                 with timer.Timer() as t:
451                     self._headers = {}
452                     body_reader = cStringIO.StringIO(body)
453                     response_body = cStringIO.StringIO()
454                     curl.setopt(pycurl.NOSIGNAL, 1)
455                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
456                     curl.setopt(pycurl.URL, url.encode('utf-8'))
457                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
458                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
459                     # response) instead of sending the request body immediately.
460                     # This allows the server to reject the request if the request
461                     # is invalid or the server is read-only, without waiting for
462                     # the client to send the entire block.
463                     curl.setopt(pycurl.UPLOAD, True)
464                     curl.setopt(pycurl.INFILESIZE, len(body))
465                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
466                     curl.setopt(pycurl.HTTPHEADER, [
467                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
468                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
469                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
470                     self._setcurltimeouts(curl, timeout)
471                     try:
472                         curl.perform()
473                     except Exception as e:
474                         raise arvados.errors.HttpError(0, str(e))
475                     self._result = {
476                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
477                         'body': response_body.getvalue(),
478                         'headers': self._headers,
479                         'error': False,
480                     }
481                 ok = retry.check_http_response_success(self._result['status_code'])
482                 if not ok:
483                     self._result['error'] = arvados.errors.HttpError(
484                         self._result['status_code'],
485                         self._headers.get('x-status-line', 'Error'))
486             except self.HTTP_ERRORS as e:
487                 self._result = {
488                     'error': e,
489                 }
490             self._usable = ok != False # still usable if ok is True or None
491             if self._result.get('status_code', None):
492                 # Client is functional. See comment in get().
493                 self._put_user_agent(curl)
494             else:
495                 curl.close()
496             if not ok:
497                 _logger.debug("Request fail: PUT %s => %s: %s",
498                               url, type(self._result['error']), str(self._result['error']))
499                 return False
500             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
501                          self._result['status_code'],
502                          len(body),
503                          t.msecs,
504                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
505             if self.upload_counter:
506                 self.upload_counter.add(len(body))
507             return True
508
509         def _setcurltimeouts(self, curl, timeouts):
510             if not timeouts:
511                 return
512             elif isinstance(timeouts, tuple):
513                 if len(timeouts) == 2:
514                     conn_t, xfer_t = timeouts
515                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
516                 else:
517                     conn_t, xfer_t, bandwidth_bps = timeouts
518             else:
519                 conn_t, xfer_t = (timeouts, timeouts)
520                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
521             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
522             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
523             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
524
525         def _headerfunction(self, header_line):
526             header_line = header_line.decode('iso-8859-1')
527             if ':' in header_line:
528                 name, value = header_line.split(':', 1)
529                 name = name.strip().lower()
530                 value = value.strip()
531             elif self._headers:
532                 name = self._lastheadername
533                 value = self._headers[name] + ' ' + header_line.strip()
534             elif header_line.startswith('HTTP/'):
535                 name = 'x-status-line'
536                 value = header_line
537             else:
538                 _logger.error("Unexpected header line: %s", header_line)
539                 return
540             self._lastheadername = name
541             self._headers[name] = value
542             # Returning None implies all bytes were written
543
544
545     class KeepWriterThread(threading.Thread):
546         """
547         Write a blob of data to the given Keep server. On success, call
548         save_response() of the given ThreadLimiter to save the returned
549         locator.
550         """
551         def __init__(self, keep_service, **kwargs):
552             super(KeepClient.KeepWriterThread, self).__init__()
553             self.service = keep_service
554             self.args = kwargs
555             self._success = False
556
557         def success(self):
558             return self._success
559
560         def run(self):
561             limiter = self.args['thread_limiter']
562             sequence = self.args['thread_sequence']
563             if sequence is not None:
564                 limiter.set_sequence(sequence)
565             with limiter:
566                 if not limiter.shall_i_proceed():
567                     # My turn arrived, but the job has been done without
568                     # me.
569                     return
570                 self.run_with_limiter(limiter)
571
572         def run_with_limiter(self, limiter):
573             if self.service.finished():
574                 return
575             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
576                           str(threading.current_thread()),
577                           self.args['data_hash'],
578                           len(self.args['data']),
579                           self.args['service_root'])
580             self._success = bool(self.service.put(
581                 self.args['data_hash'],
582                 self.args['data'],
583                 timeout=self.args.get('timeout', None)))
584             result = self.service.last_result()
585             if self._success:
586                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
587                               str(threading.current_thread()),
588                               self.args['data_hash'],
589                               len(self.args['data']),
590                               self.args['service_root'])
591                 # Tick the 'done' counter for the number of replica
592                 # reported stored by the server, for the case that
593                 # we're talking to a proxy or other backend that
594                 # stores to multiple copies for us.
595                 try:
596                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
597                 except (KeyError, ValueError):
598                     replicas_stored = 1
599                 limiter.save_response(result['body'].strip(), replicas_stored)
600             elif result.get('status_code', None):
601                 _logger.debug("Request fail: PUT %s => %s %s",
602                               self.args['data_hash'],
603                               result['status_code'],
604                               result['body'])
605
606
607     def __init__(self, api_client=None, proxy=None,
608                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
609                  api_token=None, local_store=None, block_cache=None,
610                  num_retries=0, session=None):
611         """Initialize a new KeepClient.
612
613         Arguments:
614         :api_client:
615           The API client to use to find Keep services.  If not
616           provided, KeepClient will build one from available Arvados
617           configuration.
618
619         :proxy:
620           If specified, this KeepClient will send requests to this Keep
621           proxy.  Otherwise, KeepClient will fall back to the setting of the
622           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
623           KeepClient does not use a proxy, pass in an empty string.
624
625         :timeout:
626           The initial timeout (in seconds) for HTTP requests to Keep
627           non-proxy servers.  A tuple of three floats is interpreted as
628           (connection_timeout, read_timeout, minimum_bandwidth). A connection
629           will be aborted if the average traffic rate falls below
630           minimum_bandwidth bytes per second over an interval of read_timeout
631           seconds. Because timeouts are often a result of transient server
632           load, the actual connection timeout will be increased by a factor
633           of two on each retry.
634           Default: (2, 256, 32768).
635
636         :proxy_timeout:
637           The initial timeout (in seconds) for HTTP requests to
638           Keep proxies. A tuple of three floats is interpreted as
639           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
640           described above for adjusting connection timeouts on retry also
641           applies.
642           Default: (20, 256, 32768).
643
644         :api_token:
645           If you're not using an API client, but only talking
646           directly to a Keep proxy, this parameter specifies an API token
647           to authenticate Keep requests.  It is an error to specify both
648           api_client and api_token.  If you specify neither, KeepClient
649           will use one available from the Arvados configuration.
650
651         :local_store:
652           If specified, this KeepClient will bypass Keep
653           services, and save data to the named directory.  If unspecified,
654           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
655           environment variable.  If you want to ensure KeepClient does not
656           use local storage, pass in an empty string.  This is primarily
657           intended to mock a server for testing.
658
659         :num_retries:
660           The default number of times to retry failed requests.
661           This will be used as the default num_retries value when get() and
662           put() are called.  Default 0.
663         """
664         self.lock = threading.Lock()
665         if proxy is None:
666             proxy = config.get('ARVADOS_KEEP_PROXY')
667         if api_token is None:
668             if api_client is None:
669                 api_token = config.get('ARVADOS_API_TOKEN')
670             else:
671                 api_token = api_client.api_token
672         elif api_client is not None:
673             raise ValueError(
674                 "can't build KeepClient with both API client and token")
675         if local_store is None:
676             local_store = os.environ.get('KEEP_LOCAL_STORE')
677
678         self.block_cache = block_cache if block_cache else KeepBlockCache()
679         self.timeout = timeout
680         self.proxy_timeout = proxy_timeout
681         self._user_agent_pool = Queue.LifoQueue()
682         self.upload_counter = Counter()
683         self.download_counter = Counter()
684         self.put_counter = Counter()
685         self.get_counter = Counter()
686         self.hits_counter = Counter()
687         self.misses_counter = Counter()
688
689         if local_store:
690             self.local_store = local_store
691             self.get = self.local_store_get
692             self.put = self.local_store_put
693         else:
694             self.num_retries = num_retries
695             self.max_replicas_per_service = None
696             if proxy:
697                 if not proxy.endswith('/'):
698                     proxy += '/'
699                 self.api_token = api_token
700                 self._gateway_services = {}
701                 self._keep_services = [{
702                     'uuid': 'proxy',
703                     'service_type': 'proxy',
704                     '_service_root': proxy,
705                     }]
706                 self._writable_services = self._keep_services
707                 self.using_proxy = True
708                 self._static_services_list = True
709             else:
710                 # It's important to avoid instantiating an API client
711                 # unless we actually need one, for testing's sake.
712                 if api_client is None:
713                     api_client = arvados.api('v1')
714                 self.api_client = api_client
715                 self.api_token = api_client.api_token
716                 self._gateway_services = {}
717                 self._keep_services = None
718                 self._writable_services = None
719                 self.using_proxy = None
720                 self._static_services_list = False
721
722     def current_timeout(self, attempt_number):
723         """Return the appropriate timeout to use for this client.
724
725         The proxy timeout setting if the backend service is currently a proxy,
726         the regular timeout setting otherwise.  The `attempt_number` indicates
727         how many times the operation has been tried already (starting from 0
728         for the first try), and scales the connection timeout portion of the
729         return value accordingly.
730
731         """
732         # TODO(twp): the timeout should be a property of a
733         # KeepService, not a KeepClient. See #4488.
734         t = self.proxy_timeout if self.using_proxy else self.timeout
735         if len(t) == 2:
736             return (t[0] * (1 << attempt_number), t[1])
737         else:
738             return (t[0] * (1 << attempt_number), t[1], t[2])
739     def _any_nondisk_services(self, service_list):
740         return any(ks.get('service_type', 'disk') != 'disk'
741                    for ks in service_list)
742
743     def build_services_list(self, force_rebuild=False):
744         if (self._static_services_list or
745               (self._keep_services and not force_rebuild)):
746             return
747         with self.lock:
748             try:
749                 keep_services = self.api_client.keep_services().accessible()
750             except Exception:  # API server predates Keep services.
751                 keep_services = self.api_client.keep_disks().list()
752
753             # Gateway services are only used when specified by UUID,
754             # so there's nothing to gain by filtering them by
755             # service_type.
756             self._gateway_services = {ks['uuid']: ks for ks in
757                                       keep_services.execute()['items']}
758             if not self._gateway_services:
759                 raise arvados.errors.NoKeepServersError()
760
761             # Precompute the base URI for each service.
762             for r in self._gateway_services.itervalues():
763                 host = r['service_host']
764                 if not host.startswith('[') and host.find(':') >= 0:
765                     # IPv6 URIs must be formatted like http://[::1]:80/...
766                     host = '[' + host + ']'
767                 r['_service_root'] = "{}://{}:{:d}/".format(
768                     'https' if r['service_ssl_flag'] else 'http',
769                     host,
770                     r['service_port'])
771
772             _logger.debug(str(self._gateway_services))
773             self._keep_services = [
774                 ks for ks in self._gateway_services.itervalues()
775                 if not ks.get('service_type', '').startswith('gateway:')]
776             self._writable_services = [ks for ks in self._keep_services
777                                        if not ks.get('read_only')]
778
779             # For disk type services, max_replicas_per_service is 1
780             # It is unknown (unlimited) for other service types.
781             if self._any_nondisk_services(self._writable_services):
782                 self.max_replicas_per_service = None
783             else:
784                 self.max_replicas_per_service = 1
785
786     def _service_weight(self, data_hash, service_uuid):
787         """Compute the weight of a Keep service endpoint for a data
788         block with a known hash.
789
790         The weight is md5(h + u) where u is the last 15 characters of
791         the service endpoint's UUID.
792         """
793         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
794
795     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
796         """Return an array of Keep service endpoints, in the order in
797         which they should be probed when reading or writing data with
798         the given hash+hints.
799         """
800         self.build_services_list(force_rebuild)
801
802         sorted_roots = []
803         # Use the services indicated by the given +K@... remote
804         # service hints, if any are present and can be resolved to a
805         # URI.
806         for hint in locator.hints:
807             if hint.startswith('K@'):
808                 if len(hint) == 7:
809                     sorted_roots.append(
810                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
811                 elif len(hint) == 29:
812                     svc = self._gateway_services.get(hint[2:])
813                     if svc:
814                         sorted_roots.append(svc['_service_root'])
815
816         # Sort the available local services by weight (heaviest first)
817         # for this locator, and return their service_roots (base URIs)
818         # in that order.
819         use_services = self._keep_services
820         if need_writable:
821             use_services = self._writable_services
822         self.using_proxy = self._any_nondisk_services(use_services)
823         sorted_roots.extend([
824             svc['_service_root'] for svc in sorted(
825                 use_services,
826                 reverse=True,
827                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
828         _logger.debug("{}: {}".format(locator, sorted_roots))
829         return sorted_roots
830
831     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
832         # roots_map is a dictionary, mapping Keep service root strings
833         # to KeepService objects.  Poll for Keep services, and add any
834         # new ones to roots_map.  Return the current list of local
835         # root strings.
836         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
837         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
838         for root in local_roots:
839             if root not in roots_map:
840                 roots_map[root] = self.KeepService(
841                     root, self._user_agent_pool,
842                     upload_counter=self.upload_counter,
843                     download_counter=self.download_counter,
844                     **headers)
845         return local_roots
846
847     @staticmethod
848     def _check_loop_result(result):
849         # KeepClient RetryLoops should save results as a 2-tuple: the
850         # actual result of the request, and the number of servers available
851         # to receive the request this round.
852         # This method returns True if there's a real result, False if
853         # there are no more servers available, otherwise None.
854         if isinstance(result, Exception):
855             return None
856         result, tried_server_count = result
857         if (result is not None) and (result is not False):
858             return True
859         elif tried_server_count < 1:
860             _logger.info("No more Keep services to try; giving up")
861             return False
862         else:
863             return None
864
865     def get_from_cache(self, loc):
866         """Fetch a block only if is in the cache, otherwise return None."""
867         slot = self.block_cache.get(loc)
868         if slot is not None and slot.ready.is_set():
869             return slot.get()
870         else:
871             return None
872
873     @retry.retry_method
874     def get(self, loc_s, num_retries=None):
875         """Get data from Keep.
876
877         This method fetches one or more blocks of data from Keep.  It
878         sends a request each Keep service registered with the API
879         server (or the proxy provided when this client was
880         instantiated), then each service named in location hints, in
881         sequence.  As soon as one service provides the data, it's
882         returned.
883
884         Arguments:
885         * loc_s: A string of one or more comma-separated locators to fetch.
886           This method returns the concatenation of these blocks.
887         * num_retries: The number of times to retry GET requests to
888           *each* Keep server if it returns temporary failures, with
889           exponential backoff.  Note that, in each loop, the method may try
890           to fetch data from every available Keep service, along with any
891           that are named in location hints in the locator.  The default value
892           is set when the KeepClient is initialized.
893         """
894         if ',' in loc_s:
895             return ''.join(self.get(x) for x in loc_s.split(','))
896
897         self.get_counter.add(1)
898
899         locator = KeepLocator(loc_s)
900         slot, first = self.block_cache.reserve_cache(locator.md5sum)
901         if not first:
902             self.hits_counter.add(1)
903             v = slot.get()
904             return v
905
906         self.misses_counter.add(1)
907
908         # If the locator has hints specifying a prefix (indicating a
909         # remote keepproxy) or the UUID of a local gateway service,
910         # read data from the indicated service(s) instead of the usual
911         # list of local disk services.
912         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
913                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
914         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
915                            for hint in locator.hints if (
916                                    hint.startswith('K@') and
917                                    len(hint) == 29 and
918                                    self._gateway_services.get(hint[2:])
919                                    )])
920         # Map root URLs to their KeepService objects.
921         roots_map = {
922             root: self.KeepService(root, self._user_agent_pool,
923                                    upload_counter=self.upload_counter,
924                                    download_counter=self.download_counter)
925             for root in hint_roots
926         }
927
928         # See #3147 for a discussion of the loop implementation.  Highlights:
929         # * Refresh the list of Keep services after each failure, in case
930         #   it's being updated.
931         # * Retry until we succeed, we're out of retries, or every available
932         #   service has returned permanent failure.
933         sorted_roots = []
934         roots_map = {}
935         blob = None
936         loop = retry.RetryLoop(num_retries, self._check_loop_result,
937                                backoff_start=2)
938         for tries_left in loop:
939             try:
940                 sorted_roots = self.map_new_services(
941                     roots_map, locator,
942                     force_rebuild=(tries_left < num_retries),
943                     need_writable=False)
944             except Exception as error:
945                 loop.save_result(error)
946                 continue
947
948             # Query KeepService objects that haven't returned
949             # permanent failure, in our specified shuffle order.
950             services_to_try = [roots_map[root]
951                                for root in sorted_roots
952                                if roots_map[root].usable()]
953             for keep_service in services_to_try:
954                 blob = keep_service.get(locator, timeout=self.current_timeout(num_retries-tries_left))
955                 if blob is not None:
956                     break
957             loop.save_result((blob, len(services_to_try)))
958
959         # Always cache the result, then return it if we succeeded.
960         slot.set(blob)
961         self.block_cache.cap_cache()
962         if loop.success():
963             return blob
964
965         # Q: Including 403 is necessary for the Keep tests to continue
966         # passing, but maybe they should expect KeepReadError instead?
967         not_founds = sum(1 for key in sorted_roots
968                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
969         service_errors = ((key, roots_map[key].last_result()['error'])
970                           for key in sorted_roots)
971         if not roots_map:
972             raise arvados.errors.KeepReadError(
973                 "failed to read {}: no Keep services available ({})".format(
974                     loc_s, loop.last_result()))
975         elif not_founds == len(sorted_roots):
976             raise arvados.errors.NotFoundError(
977                 "{} not found".format(loc_s), service_errors)
978         else:
979             raise arvados.errors.KeepReadError(
980                 "failed to read {}".format(loc_s), service_errors, label="service")
981
982     @retry.retry_method
983     def put(self, data, copies=2, num_retries=None):
984         """Save data in Keep.
985
986         This method will get a list of Keep services from the API server, and
987         send the data to each one simultaneously in a new thread.  Once the
988         uploads are finished, if enough copies are saved, this method returns
989         the most recent HTTP response body.  If requests fail to upload
990         enough copies, this method raises KeepWriteError.
991
992         Arguments:
993         * data: The string of data to upload.
994         * copies: The number of copies that the user requires be saved.
995           Default 2.
996         * num_retries: The number of times to retry PUT requests to
997           *each* Keep server if it returns temporary failures, with
998           exponential backoff.  The default value is set when the
999           KeepClient is initialized.
1000         """
1001
1002         if isinstance(data, unicode):
1003             data = data.encode("ascii")
1004         elif not isinstance(data, str):
1005             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1006
1007         self.put_counter.add(1)
1008
1009         data_hash = hashlib.md5(data).hexdigest()
1010         loc_s = data_hash + '+' + str(len(data))
1011         if copies < 1:
1012             return loc_s
1013         locator = KeepLocator(loc_s)
1014
1015         headers = {}
1016         # Tell the proxy how many copies we want it to store
1017         headers['X-Keep-Desired-Replication'] = str(copies)
1018         roots_map = {}
1019         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1020                                backoff_start=2)
1021         done = 0
1022         for tries_left in loop:
1023             try:
1024                 sorted_roots = self.map_new_services(
1025                     roots_map, locator,
1026                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1027             except Exception as error:
1028                 loop.save_result(error)
1029                 continue
1030
1031             thread_limiter = KeepClient.ThreadLimiter(
1032                 copies - done, self.max_replicas_per_service)
1033             threads = []
1034             for service_root, ks in [(root, roots_map[root])
1035                                      for root in sorted_roots]:
1036                 if ks.finished():
1037                     continue
1038                 t = KeepClient.KeepWriterThread(
1039                     ks,
1040                     data=data,
1041                     data_hash=data_hash,
1042                     service_root=service_root,
1043                     thread_limiter=thread_limiter,
1044                     timeout=self.current_timeout(num_retries-tries_left),
1045                     thread_sequence=len(threads))
1046                 t.start()
1047                 threads.append(t)
1048             for t in threads:
1049                 t.join()
1050             done += thread_limiter.done()
1051             loop.save_result((done >= copies, len(threads)))
1052
1053         if loop.success():
1054             return thread_limiter.response()
1055         if not roots_map:
1056             raise arvados.errors.KeepWriteError(
1057                 "failed to write {}: no Keep services available ({})".format(
1058                     data_hash, loop.last_result()))
1059         else:
1060             service_errors = ((key, roots_map[key].last_result()['error'])
1061                               for key in sorted_roots
1062                               if roots_map[key].last_result()['error'])
1063             raise arvados.errors.KeepWriteError(
1064                 "failed to write {} (wanted {} copies but wrote {})".format(
1065                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1066
1067     def local_store_put(self, data, copies=1, num_retries=None):
1068         """A stub for put().
1069
1070         This method is used in place of the real put() method when
1071         using local storage (see constructor's local_store argument).
1072
1073         copies and num_retries arguments are ignored: they are here
1074         only for the sake of offering the same call signature as
1075         put().
1076
1077         Data stored this way can be retrieved via local_store_get().
1078         """
1079         md5 = hashlib.md5(data).hexdigest()
1080         locator = '%s+%d' % (md5, len(data))
1081         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1082             f.write(data)
1083         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1084                   os.path.join(self.local_store, md5))
1085         return locator
1086
1087     def local_store_get(self, loc_s, num_retries=None):
1088         """Companion to local_store_put()."""
1089         try:
1090             locator = KeepLocator(loc_s)
1091         except ValueError:
1092             raise arvados.errors.NotFoundError(
1093                 "Invalid data locator: '%s'" % loc_s)
1094         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1095             return ''
1096         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1097             return f.read()
1098
1099     def is_cached(self, locator):
1100         return self.block_cache.reserve_cache(expect_hash)