Merge branch '9408-arvbox-dockerfile-pulls-unnecessary-packages'
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import threading
13 import timer
14
15 import arvados
16 import arvados.config as config
17 import arvados.errors
18 import arvados.retry as retry
19 import arvados.util
20
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
23
24
25 class KeepLocator(object):
26     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28
29     def __init__(self, locator_str):
30         self.hints = []
31         self._perm_sig = None
32         self._perm_expiry = None
33         pieces = iter(locator_str.split('+'))
34         self.md5sum = next(pieces)
35         try:
36             self.size = int(next(pieces))
37         except StopIteration:
38             self.size = None
39         for hint in pieces:
40             if self.HINT_RE.match(hint) is None:
41                 raise ValueError("invalid hint format: {}".format(hint))
42             elif hint.startswith('A'):
43                 self.parse_permission_hint(hint)
44             else:
45                 self.hints.append(hint)
46
47     def __str__(self):
48         return '+'.join(
49             str(s) for s in [self.md5sum, self.size,
50                              self.permission_hint()] + self.hints
51             if s is not None)
52
53     def stripped(self):
54         if self.size is not None:
55             return "%s+%i" % (self.md5sum, self.size)
56         else:
57             return self.md5sum
58
59     def _make_hex_prop(name, length):
60         # Build and return a new property with the given name that
61         # must be a hex string of the given length.
62         data_name = '_{}'.format(name)
63         def getter(self):
64             return getattr(self, data_name)
65         def setter(self, hex_str):
66             if not arvados.util.is_hex(hex_str, length):
67                 raise ValueError("{} is not a {}-digit hex string: {}".
68                                  format(name, length, hex_str))
69             setattr(self, data_name, hex_str)
70         return property(getter, setter)
71
72     md5sum = _make_hex_prop('md5sum', 32)
73     perm_sig = _make_hex_prop('perm_sig', 40)
74
75     @property
76     def perm_expiry(self):
77         return self._perm_expiry
78
79     @perm_expiry.setter
80     def perm_expiry(self, value):
81         if not arvados.util.is_hex(value, 1, 8):
82             raise ValueError(
83                 "permission timestamp must be a hex Unix timestamp: {}".
84                 format(value))
85         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86
87     def permission_hint(self):
88         data = [self.perm_sig, self.perm_expiry]
89         if None in data:
90             return None
91         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92         return "A{}@{:08x}".format(*data)
93
94     def parse_permission_hint(self, s):
95         try:
96             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97         except IndexError:
98             raise ValueError("bad permission hint {}".format(s))
99
100     def permission_expired(self, as_of_dt=None):
101         if self.perm_expiry is None:
102             return False
103         elif as_of_dt is None:
104             as_of_dt = datetime.datetime.now()
105         return self.perm_expiry <= as_of_dt
106
107
108 class Keep(object):
109     """Simple interface to a global KeepClient object.
110
111     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
112     own API client.  The global KeepClient will build an API client from the
113     current Arvados configuration, which may not match the one you built.
114     """
115     _last_key = None
116
117     @classmethod
118     def global_client_object(cls):
119         global global_client_object
120         # Previously, KeepClient would change its behavior at runtime based
121         # on these configuration settings.  We simulate that behavior here
122         # by checking the values and returning a new KeepClient if any of
123         # them have changed.
124         key = (config.get('ARVADOS_API_HOST'),
125                config.get('ARVADOS_API_TOKEN'),
126                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127                config.get('ARVADOS_KEEP_PROXY'),
128                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129                os.environ.get('KEEP_LOCAL_STORE'))
130         if (global_client_object is None) or (cls._last_key != key):
131             global_client_object = KeepClient()
132             cls._last_key = key
133         return global_client_object
134
135     @staticmethod
136     def get(locator, **kwargs):
137         return Keep.global_client_object().get(locator, **kwargs)
138
139     @staticmethod
140     def put(data, **kwargs):
141         return Keep.global_client_object().put(data, **kwargs)
142
143 class KeepBlockCache(object):
144     # Default RAM cache is 256MiB
145     def __init__(self, cache_max=(256 * 1024 * 1024)):
146         self.cache_max = cache_max
147         self._cache = []
148         self._cache_lock = threading.Lock()
149
150     class CacheSlot(object):
151         __slots__ = ("locator", "ready", "content")
152
153         def __init__(self, locator):
154             self.locator = locator
155             self.ready = threading.Event()
156             self.content = None
157
158         def get(self):
159             self.ready.wait()
160             return self.content
161
162         def set(self, value):
163             self.content = value
164             self.ready.set()
165
166         def size(self):
167             if self.content is None:
168                 return 0
169             else:
170                 return len(self.content)
171
172     def cap_cache(self):
173         '''Cap the cache size to self.cache_max'''
174         with self._cache_lock:
175             # Select all slots except those where ready.is_set() and content is
176             # None (that means there was an error reading the block).
177             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178             sm = sum([slot.size() for slot in self._cache])
179             while len(self._cache) > 0 and sm > self.cache_max:
180                 for i in xrange(len(self._cache)-1, -1, -1):
181                     if self._cache[i].ready.is_set():
182                         del self._cache[i]
183                         break
184                 sm = sum([slot.size() for slot in self._cache])
185
186     def _get(self, locator):
187         # Test if the locator is already in the cache
188         for i in xrange(0, len(self._cache)):
189             if self._cache[i].locator == locator:
190                 n = self._cache[i]
191                 if i != 0:
192                     # move it to the front
193                     del self._cache[i]
194                     self._cache.insert(0, n)
195                 return n
196         return None
197
198     def get(self, locator):
199         with self._cache_lock:
200             return self._get(locator)
201
202     def reserve_cache(self, locator):
203         '''Reserve a cache slot for the specified locator,
204         or return the existing slot.'''
205         with self._cache_lock:
206             n = self._get(locator)
207             if n:
208                 return n, False
209             else:
210                 # Add a new cache slot for the locator
211                 n = KeepBlockCache.CacheSlot(locator)
212                 self._cache.insert(0, n)
213                 return n, True
214
215 class Counter(object):
216     def __init__(self, v=0):
217         self._lk = threading.Lock()
218         self._val = v
219
220     def add(self, v):
221         with self._lk:
222             self._val += v
223
224     def get(self):
225         with self._lk:
226             return self._val
227
228
229 class KeepClient(object):
230
231     # Default Keep server connection timeout:  2 seconds
232     # Default Keep server read timeout:       256 seconds
233     # Default Keep server bandwidth minimum:  32768 bytes per second
234     # Default Keep proxy connection timeout:  20 seconds
235     # Default Keep proxy read timeout:        256 seconds
236     # Default Keep proxy bandwidth minimum:   32768 bytes per second
237     DEFAULT_TIMEOUT = (2, 256, 32768)
238     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
239
240     class ThreadLimiter(object):
241         """Limit the number of threads writing to Keep at once.
242
243         This ensures that only a number of writer threads that could
244         potentially achieve the desired replication level run at once.
245         Once the desired replication level is achieved, queued threads
246         are instructed not to run.
247
248         Should be used in a "with" block.
249         """
250         def __init__(self, want_copies, max_service_replicas):
251             self._started = 0
252             self._want_copies = want_copies
253             self._done = 0
254             self._response = None
255             self._start_lock = threading.Condition()
256             if (not max_service_replicas) or (max_service_replicas >= want_copies):
257                 max_threads = 1
258             else:
259                 max_threads = math.ceil(float(want_copies) / max_service_replicas)
260             _logger.debug("Limiter max threads is %d", max_threads)
261             self._todo_lock = threading.Semaphore(max_threads)
262             self._done_lock = threading.Lock()
263             self._local = threading.local()
264
265         def __enter__(self):
266             self._start_lock.acquire()
267             if getattr(self._local, 'sequence', None) is not None:
268                 # If the calling thread has used set_sequence(N), then
269                 # we wait here until N other threads have started.
270                 while self._started < self._local.sequence:
271                     self._start_lock.wait()
272             self._todo_lock.acquire()
273             self._started += 1
274             self._start_lock.notifyAll()
275             self._start_lock.release()
276             return self
277
278         def __exit__(self, type, value, traceback):
279             self._todo_lock.release()
280
281         def set_sequence(self, sequence):
282             self._local.sequence = sequence
283
284         def shall_i_proceed(self):
285             """
286             Return true if the current thread should write to Keep.
287             Return false otherwise.
288             """
289             with self._done_lock:
290                 return (self._done < self._want_copies)
291
292         def save_response(self, response_body, replicas_stored):
293             """
294             Records a response body (a locator, possibly signed) returned by
295             the Keep server, and the number of replicas it stored.
296             """
297             with self._done_lock:
298                 self._done += replicas_stored
299                 self._response = response_body
300
301         def response(self):
302             """Return the body from the response to a PUT request."""
303             with self._done_lock:
304                 return self._response
305
306         def done(self):
307             """Return the total number of replicas successfully stored."""
308             with self._done_lock:
309                 return self._done
310
311     class KeepService(object):
312         """Make requests to a single Keep service, and track results.
313
314         A KeepService is intended to last long enough to perform one
315         transaction (GET or PUT) against one Keep service. This can
316         involve calling either get() or put() multiple times in order
317         to retry after transient failures. However, calling both get()
318         and put() on a single instance -- or using the same instance
319         to access two different Keep services -- will not produce
320         sensible behavior.
321         """
322
323         HTTP_ERRORS = (
324             socket.error,
325             ssl.SSLError,
326             arvados.errors.HttpError,
327         )
328
329         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
330                      upload_counter=None,
331                      download_counter=None, **headers):
332             self.root = root
333             self._user_agent_pool = user_agent_pool
334             self._result = {'error': None}
335             self._usable = True
336             self._session = None
337             self.get_headers = {'Accept': 'application/octet-stream'}
338             self.get_headers.update(headers)
339             self.put_headers = headers
340             self.upload_counter = upload_counter
341             self.download_counter = download_counter
342
343         def usable(self):
344             """Is it worth attempting a request?"""
345             return self._usable
346
347         def finished(self):
348             """Did the request succeed or encounter permanent failure?"""
349             return self._result['error'] == False or not self._usable
350
351         def last_result(self):
352             return self._result
353
354         def _get_user_agent(self):
355             try:
356                 return self._user_agent_pool.get(False)
357             except Queue.Empty:
358                 return pycurl.Curl()
359
360         def _put_user_agent(self, ua):
361             try:
362                 ua.reset()
363                 self._user_agent_pool.put(ua, False)
364             except:
365                 ua.close()
366
367         @staticmethod
368         def _socket_open(family, socktype, protocol, address=None):
369             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
370             s = socket.socket(family, socktype, protocol)
371             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
372             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
373             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
374             return s
375
376         def get(self, locator, method="GET", timeout=None):
377             # locator is a KeepLocator object.
378             url = self.root + str(locator)
379             _logger.debug("Request: %s %s", method, url)
380             curl = self._get_user_agent()
381             ok = None
382             try:
383                 with timer.Timer() as t:
384                     self._headers = {}
385                     response_body = cStringIO.StringIO()
386                     curl.setopt(pycurl.NOSIGNAL, 1)
387                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
388                     curl.setopt(pycurl.URL, url.encode('utf-8'))
389                     curl.setopt(pycurl.HTTPHEADER, [
390                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
391                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
392                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
393                     if method == "HEAD":
394                         curl.setopt(pycurl.NOBODY, True)
395                     self._setcurltimeouts(curl, timeout)
396
397                     try:
398                         curl.perform()
399                     except Exception as e:
400                         raise arvados.errors.HttpError(0, str(e))
401                     self._result = {
402                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
403                         'body': response_body.getvalue(),
404                         'headers': self._headers,
405                         'error': False,
406                     }
407
408                 ok = retry.check_http_response_success(self._result['status_code'])
409                 if not ok:
410                     self._result['error'] = arvados.errors.HttpError(
411                         self._result['status_code'],
412                         self._headers.get('x-status-line', 'Error'))
413             except self.HTTP_ERRORS as e:
414                 self._result = {
415                     'error': e,
416                 }
417             self._usable = ok != False
418             if self._result.get('status_code', None):
419                 # The client worked well enough to get an HTTP status
420                 # code, so presumably any problems are just on the
421                 # server side and it's OK to reuse the client.
422                 self._put_user_agent(curl)
423             else:
424                 # Don't return this client to the pool, in case it's
425                 # broken.
426                 curl.close()
427             if not ok:
428                 _logger.debug("Request fail: GET %s => %s: %s",
429                               url, type(self._result['error']), str(self._result['error']))
430                 return None
431             if method == "HEAD":
432                 _logger.info("HEAD %s: %s bytes",
433                          self._result['status_code'],
434                          self._result.get('content-length'))
435                 return True
436
437             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
438                          self._result['status_code'],
439                          len(self._result['body']),
440                          t.msecs,
441                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
442
443             if self.download_counter:
444                 self.download_counter.add(len(self._result['body']))
445             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
446             if resp_md5 != locator.md5sum:
447                 _logger.warning("Checksum fail: md5(%s) = %s",
448                                 url, resp_md5)
449                 self._result['error'] = arvados.errors.HttpError(
450                     0, 'Checksum fail')
451                 return None
452             return self._result['body']
453
454         def put(self, hash_s, body, timeout=None):
455             url = self.root + hash_s
456             _logger.debug("Request: PUT %s", url)
457             curl = self._get_user_agent()
458             ok = None
459             try:
460                 with timer.Timer() as t:
461                     self._headers = {}
462                     body_reader = cStringIO.StringIO(body)
463                     response_body = cStringIO.StringIO()
464                     curl.setopt(pycurl.NOSIGNAL, 1)
465                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
466                     curl.setopt(pycurl.URL, url.encode('utf-8'))
467                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
468                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
469                     # response) instead of sending the request body immediately.
470                     # This allows the server to reject the request if the request
471                     # is invalid or the server is read-only, without waiting for
472                     # the client to send the entire block.
473                     curl.setopt(pycurl.UPLOAD, True)
474                     curl.setopt(pycurl.INFILESIZE, len(body))
475                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
476                     curl.setopt(pycurl.HTTPHEADER, [
477                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
478                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
479                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
480                     self._setcurltimeouts(curl, timeout)
481                     try:
482                         curl.perform()
483                     except Exception as e:
484                         raise arvados.errors.HttpError(0, str(e))
485                     self._result = {
486                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
487                         'body': response_body.getvalue(),
488                         'headers': self._headers,
489                         'error': False,
490                     }
491                 ok = retry.check_http_response_success(self._result['status_code'])
492                 if not ok:
493                     self._result['error'] = arvados.errors.HttpError(
494                         self._result['status_code'],
495                         self._headers.get('x-status-line', 'Error'))
496             except self.HTTP_ERRORS as e:
497                 self._result = {
498                     'error': e,
499                 }
500             self._usable = ok != False # still usable if ok is True or None
501             if self._result.get('status_code', None):
502                 # Client is functional. See comment in get().
503                 self._put_user_agent(curl)
504             else:
505                 curl.close()
506             if not ok:
507                 _logger.debug("Request fail: PUT %s => %s: %s",
508                               url, type(self._result['error']), str(self._result['error']))
509                 return False
510             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
511                          self._result['status_code'],
512                          len(body),
513                          t.msecs,
514                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
515             if self.upload_counter:
516                 self.upload_counter.add(len(body))
517             return True
518
519         def _setcurltimeouts(self, curl, timeouts):
520             if not timeouts:
521                 return
522             elif isinstance(timeouts, tuple):
523                 if len(timeouts) == 2:
524                     conn_t, xfer_t = timeouts
525                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
526                 else:
527                     conn_t, xfer_t, bandwidth_bps = timeouts
528             else:
529                 conn_t, xfer_t = (timeouts, timeouts)
530                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
531             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
532             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
533             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
534
535         def _headerfunction(self, header_line):
536             header_line = header_line.decode('iso-8859-1')
537             if ':' in header_line:
538                 name, value = header_line.split(':', 1)
539                 name = name.strip().lower()
540                 value = value.strip()
541             elif self._headers:
542                 name = self._lastheadername
543                 value = self._headers[name] + ' ' + header_line.strip()
544             elif header_line.startswith('HTTP/'):
545                 name = 'x-status-line'
546                 value = header_line
547             else:
548                 _logger.error("Unexpected header line: %s", header_line)
549                 return
550             self._lastheadername = name
551             self._headers[name] = value
552             # Returning None implies all bytes were written
553
554
555     class KeepWriterThread(threading.Thread):
556         """
557         Write a blob of data to the given Keep server. On success, call
558         save_response() of the given ThreadLimiter to save the returned
559         locator.
560         """
561         def __init__(self, keep_service, **kwargs):
562             super(KeepClient.KeepWriterThread, self).__init__()
563             self.service = keep_service
564             self.args = kwargs
565             self._success = False
566
567         def success(self):
568             return self._success
569
570         def run(self):
571             limiter = self.args['thread_limiter']
572             sequence = self.args['thread_sequence']
573             if sequence is not None:
574                 limiter.set_sequence(sequence)
575             with limiter:
576                 if not limiter.shall_i_proceed():
577                     # My turn arrived, but the job has been done without
578                     # me.
579                     return
580                 self.run_with_limiter(limiter)
581
582         def run_with_limiter(self, limiter):
583             if self.service.finished():
584                 return
585             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
586                           str(threading.current_thread()),
587                           self.args['data_hash'],
588                           len(self.args['data']),
589                           self.args['service_root'])
590             self._success = bool(self.service.put(
591                 self.args['data_hash'],
592                 self.args['data'],
593                 timeout=self.args.get('timeout', None)))
594             result = self.service.last_result()
595             if self._success:
596                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
597                               str(threading.current_thread()),
598                               self.args['data_hash'],
599                               len(self.args['data']),
600                               self.args['service_root'])
601                 # Tick the 'done' counter for the number of replica
602                 # reported stored by the server, for the case that
603                 # we're talking to a proxy or other backend that
604                 # stores to multiple copies for us.
605                 try:
606                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
607                 except (KeyError, ValueError):
608                     replicas_stored = 1
609                 limiter.save_response(result['body'].strip(), replicas_stored)
610             elif result.get('status_code', None):
611                 _logger.debug("Request fail: PUT %s => %s %s",
612                               self.args['data_hash'],
613                               result['status_code'],
614                               result['body'])
615
616
617     def __init__(self, api_client=None, proxy=None,
618                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
619                  api_token=None, local_store=None, block_cache=None,
620                  num_retries=0, session=None):
621         """Initialize a new KeepClient.
622
623         Arguments:
624         :api_client:
625           The API client to use to find Keep services.  If not
626           provided, KeepClient will build one from available Arvados
627           configuration.
628
629         :proxy:
630           If specified, this KeepClient will send requests to this Keep
631           proxy.  Otherwise, KeepClient will fall back to the setting of the
632           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
633           KeepClient does not use a proxy, pass in an empty string.
634
635         :timeout:
636           The initial timeout (in seconds) for HTTP requests to Keep
637           non-proxy servers.  A tuple of three floats is interpreted as
638           (connection_timeout, read_timeout, minimum_bandwidth). A connection
639           will be aborted if the average traffic rate falls below
640           minimum_bandwidth bytes per second over an interval of read_timeout
641           seconds. Because timeouts are often a result of transient server
642           load, the actual connection timeout will be increased by a factor
643           of two on each retry.
644           Default: (2, 256, 32768).
645
646         :proxy_timeout:
647           The initial timeout (in seconds) for HTTP requests to
648           Keep proxies. A tuple of three floats is interpreted as
649           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
650           described above for adjusting connection timeouts on retry also
651           applies.
652           Default: (20, 256, 32768).
653
654         :api_token:
655           If you're not using an API client, but only talking
656           directly to a Keep proxy, this parameter specifies an API token
657           to authenticate Keep requests.  It is an error to specify both
658           api_client and api_token.  If you specify neither, KeepClient
659           will use one available from the Arvados configuration.
660
661         :local_store:
662           If specified, this KeepClient will bypass Keep
663           services, and save data to the named directory.  If unspecified,
664           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
665           environment variable.  If you want to ensure KeepClient does not
666           use local storage, pass in an empty string.  This is primarily
667           intended to mock a server for testing.
668
669         :num_retries:
670           The default number of times to retry failed requests.
671           This will be used as the default num_retries value when get() and
672           put() are called.  Default 0.
673         """
674         self.lock = threading.Lock()
675         if proxy is None:
676             proxy = config.get('ARVADOS_KEEP_PROXY')
677         if api_token is None:
678             if api_client is None:
679                 api_token = config.get('ARVADOS_API_TOKEN')
680             else:
681                 api_token = api_client.api_token
682         elif api_client is not None:
683             raise ValueError(
684                 "can't build KeepClient with both API client and token")
685         if local_store is None:
686             local_store = os.environ.get('KEEP_LOCAL_STORE')
687
688         self.block_cache = block_cache if block_cache else KeepBlockCache()
689         self.timeout = timeout
690         self.proxy_timeout = proxy_timeout
691         self._user_agent_pool = Queue.LifoQueue()
692         self.upload_counter = Counter()
693         self.download_counter = Counter()
694         self.put_counter = Counter()
695         self.get_counter = Counter()
696         self.hits_counter = Counter()
697         self.misses_counter = Counter()
698
699         if local_store:
700             self.local_store = local_store
701             self.get = self.local_store_get
702             self.put = self.local_store_put
703         else:
704             self.num_retries = num_retries
705             self.max_replicas_per_service = None
706             if proxy:
707                 if not proxy.endswith('/'):
708                     proxy += '/'
709                 self.api_token = api_token
710                 self._gateway_services = {}
711                 self._keep_services = [{
712                     'uuid': 'proxy',
713                     'service_type': 'proxy',
714                     '_service_root': proxy,
715                     }]
716                 self._writable_services = self._keep_services
717                 self.using_proxy = True
718                 self._static_services_list = True
719             else:
720                 # It's important to avoid instantiating an API client
721                 # unless we actually need one, for testing's sake.
722                 if api_client is None:
723                     api_client = arvados.api('v1')
724                 self.api_client = api_client
725                 self.api_token = api_client.api_token
726                 self._gateway_services = {}
727                 self._keep_services = None
728                 self._writable_services = None
729                 self.using_proxy = None
730                 self._static_services_list = False
731
732     def current_timeout(self, attempt_number):
733         """Return the appropriate timeout to use for this client.
734
735         The proxy timeout setting if the backend service is currently a proxy,
736         the regular timeout setting otherwise.  The `attempt_number` indicates
737         how many times the operation has been tried already (starting from 0
738         for the first try), and scales the connection timeout portion of the
739         return value accordingly.
740
741         """
742         # TODO(twp): the timeout should be a property of a
743         # KeepService, not a KeepClient. See #4488.
744         t = self.proxy_timeout if self.using_proxy else self.timeout
745         if len(t) == 2:
746             return (t[0] * (1 << attempt_number), t[1])
747         else:
748             return (t[0] * (1 << attempt_number), t[1], t[2])
749     def _any_nondisk_services(self, service_list):
750         return any(ks.get('service_type', 'disk') != 'disk'
751                    for ks in service_list)
752
753     def build_services_list(self, force_rebuild=False):
754         if (self._static_services_list or
755               (self._keep_services and not force_rebuild)):
756             return
757         with self.lock:
758             try:
759                 keep_services = self.api_client.keep_services().accessible()
760             except Exception:  # API server predates Keep services.
761                 keep_services = self.api_client.keep_disks().list()
762
763             # Gateway services are only used when specified by UUID,
764             # so there's nothing to gain by filtering them by
765             # service_type.
766             self._gateway_services = {ks['uuid']: ks for ks in
767                                       keep_services.execute()['items']}
768             if not self._gateway_services:
769                 raise arvados.errors.NoKeepServersError()
770
771             # Precompute the base URI for each service.
772             for r in self._gateway_services.itervalues():
773                 host = r['service_host']
774                 if not host.startswith('[') and host.find(':') >= 0:
775                     # IPv6 URIs must be formatted like http://[::1]:80/...
776                     host = '[' + host + ']'
777                 r['_service_root'] = "{}://{}:{:d}/".format(
778                     'https' if r['service_ssl_flag'] else 'http',
779                     host,
780                     r['service_port'])
781
782             _logger.debug(str(self._gateway_services))
783             self._keep_services = [
784                 ks for ks in self._gateway_services.itervalues()
785                 if not ks.get('service_type', '').startswith('gateway:')]
786             self._writable_services = [ks for ks in self._keep_services
787                                        if not ks.get('read_only')]
788
789             # For disk type services, max_replicas_per_service is 1
790             # It is unknown (unlimited) for other service types.
791             if self._any_nondisk_services(self._writable_services):
792                 self.max_replicas_per_service = None
793             else:
794                 self.max_replicas_per_service = 1
795
796     def _service_weight(self, data_hash, service_uuid):
797         """Compute the weight of a Keep service endpoint for a data
798         block with a known hash.
799
800         The weight is md5(h + u) where u is the last 15 characters of
801         the service endpoint's UUID.
802         """
803         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
804
805     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
806         """Return an array of Keep service endpoints, in the order in
807         which they should be probed when reading or writing data with
808         the given hash+hints.
809         """
810         self.build_services_list(force_rebuild)
811
812         sorted_roots = []
813         # Use the services indicated by the given +K@... remote
814         # service hints, if any are present and can be resolved to a
815         # URI.
816         for hint in locator.hints:
817             if hint.startswith('K@'):
818                 if len(hint) == 7:
819                     sorted_roots.append(
820                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
821                 elif len(hint) == 29:
822                     svc = self._gateway_services.get(hint[2:])
823                     if svc:
824                         sorted_roots.append(svc['_service_root'])
825
826         # Sort the available local services by weight (heaviest first)
827         # for this locator, and return their service_roots (base URIs)
828         # in that order.
829         use_services = self._keep_services
830         if need_writable:
831             use_services = self._writable_services
832         self.using_proxy = self._any_nondisk_services(use_services)
833         sorted_roots.extend([
834             svc['_service_root'] for svc in sorted(
835                 use_services,
836                 reverse=True,
837                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
838         _logger.debug("{}: {}".format(locator, sorted_roots))
839         return sorted_roots
840
841     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
842         # roots_map is a dictionary, mapping Keep service root strings
843         # to KeepService objects.  Poll for Keep services, and add any
844         # new ones to roots_map.  Return the current list of local
845         # root strings.
846         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
847         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
848         for root in local_roots:
849             if root not in roots_map:
850                 roots_map[root] = self.KeepService(
851                     root, self._user_agent_pool,
852                     upload_counter=self.upload_counter,
853                     download_counter=self.download_counter,
854                     **headers)
855         return local_roots
856
857     @staticmethod
858     def _check_loop_result(result):
859         # KeepClient RetryLoops should save results as a 2-tuple: the
860         # actual result of the request, and the number of servers available
861         # to receive the request this round.
862         # This method returns True if there's a real result, False if
863         # there are no more servers available, otherwise None.
864         if isinstance(result, Exception):
865             return None
866         result, tried_server_count = result
867         if (result is not None) and (result is not False):
868             return True
869         elif tried_server_count < 1:
870             _logger.info("No more Keep services to try; giving up")
871             return False
872         else:
873             return None
874
875     def get_from_cache(self, loc):
876         """Fetch a block only if is in the cache, otherwise return None."""
877         slot = self.block_cache.get(loc)
878         if slot is not None and slot.ready.is_set():
879             return slot.get()
880         else:
881             return None
882
883     @retry.retry_method
884     def head(self, loc_s, num_retries=None):
885         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
886
887     @retry.retry_method
888     def get(self, loc_s, num_retries=None):
889         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
890
891     def _get_or_head(self, loc_s, method="GET", num_retries=None):
892         """Get data from Keep.
893
894         This method fetches one or more blocks of data from Keep.  It
895         sends a request each Keep service registered with the API
896         server (or the proxy provided when this client was
897         instantiated), then each service named in location hints, in
898         sequence.  As soon as one service provides the data, it's
899         returned.
900
901         Arguments:
902         * loc_s: A string of one or more comma-separated locators to fetch.
903           This method returns the concatenation of these blocks.
904         * num_retries: The number of times to retry GET requests to
905           *each* Keep server if it returns temporary failures, with
906           exponential backoff.  Note that, in each loop, the method may try
907           to fetch data from every available Keep service, along with any
908           that are named in location hints in the locator.  The default value
909           is set when the KeepClient is initialized.
910         """
911         if ',' in loc_s:
912             return ''.join(self.get(x) for x in loc_s.split(','))
913
914         self.get_counter.add(1)
915
916         locator = KeepLocator(loc_s)
917         if method == "GET":
918             slot, first = self.block_cache.reserve_cache(locator.md5sum)
919             if not first:
920                 self.hits_counter.add(1)
921                 v = slot.get()
922                 return v
923
924         self.misses_counter.add(1)
925
926         # If the locator has hints specifying a prefix (indicating a
927         # remote keepproxy) or the UUID of a local gateway service,
928         # read data from the indicated service(s) instead of the usual
929         # list of local disk services.
930         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
931                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
932         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
933                            for hint in locator.hints if (
934                                    hint.startswith('K@') and
935                                    len(hint) == 29 and
936                                    self._gateway_services.get(hint[2:])
937                                    )])
938         # Map root URLs to their KeepService objects.
939         roots_map = {
940             root: self.KeepService(root, self._user_agent_pool,
941                                    upload_counter=self.upload_counter,
942                                    download_counter=self.download_counter)
943             for root in hint_roots
944         }
945
946         # See #3147 for a discussion of the loop implementation.  Highlights:
947         # * Refresh the list of Keep services after each failure, in case
948         #   it's being updated.
949         # * Retry until we succeed, we're out of retries, or every available
950         #   service has returned permanent failure.
951         sorted_roots = []
952         roots_map = {}
953         blob = None
954         loop = retry.RetryLoop(num_retries, self._check_loop_result,
955                                backoff_start=2)
956         for tries_left in loop:
957             try:
958                 sorted_roots = self.map_new_services(
959                     roots_map, locator,
960                     force_rebuild=(tries_left < num_retries),
961                     need_writable=False)
962             except Exception as error:
963                 loop.save_result(error)
964                 continue
965
966             # Query KeepService objects that haven't returned
967             # permanent failure, in our specified shuffle order.
968             services_to_try = [roots_map[root]
969                                for root in sorted_roots
970                                if roots_map[root].usable()]
971             for keep_service in services_to_try:
972                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
973                 if blob is not None:
974                     break
975             loop.save_result((blob, len(services_to_try)))
976
977         # Always cache the result, then return it if we succeeded.
978         if method == "GET":
979             slot.set(blob)
980             self.block_cache.cap_cache()
981         if loop.success():
982             if method == "HEAD":
983                 return True
984             else:
985                 return blob
986
987         # Q: Including 403 is necessary for the Keep tests to continue
988         # passing, but maybe they should expect KeepReadError instead?
989         not_founds = sum(1 for key in sorted_roots
990                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
991         service_errors = ((key, roots_map[key].last_result()['error'])
992                           for key in sorted_roots)
993         if not roots_map:
994             raise arvados.errors.KeepReadError(
995                 "failed to read {}: no Keep services available ({})".format(
996                     loc_s, loop.last_result()))
997         elif not_founds == len(sorted_roots):
998             raise arvados.errors.NotFoundError(
999                 "{} not found".format(loc_s), service_errors)
1000         else:
1001             raise arvados.errors.KeepReadError(
1002                 "failed to read {}".format(loc_s), service_errors, label="service")
1003
1004     @retry.retry_method
1005     def put(self, data, copies=2, num_retries=None):
1006         """Save data in Keep.
1007
1008         This method will get a list of Keep services from the API server, and
1009         send the data to each one simultaneously in a new thread.  Once the
1010         uploads are finished, if enough copies are saved, this method returns
1011         the most recent HTTP response body.  If requests fail to upload
1012         enough copies, this method raises KeepWriteError.
1013
1014         Arguments:
1015         * data: The string of data to upload.
1016         * copies: The number of copies that the user requires be saved.
1017           Default 2.
1018         * num_retries: The number of times to retry PUT requests to
1019           *each* Keep server if it returns temporary failures, with
1020           exponential backoff.  The default value is set when the
1021           KeepClient is initialized.
1022         """
1023
1024         if isinstance(data, unicode):
1025             data = data.encode("ascii")
1026         elif not isinstance(data, str):
1027             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1028
1029         self.put_counter.add(1)
1030
1031         data_hash = hashlib.md5(data).hexdigest()
1032         loc_s = data_hash + '+' + str(len(data))
1033         if copies < 1:
1034             return loc_s
1035         locator = KeepLocator(loc_s)
1036
1037         headers = {}
1038         # Tell the proxy how many copies we want it to store
1039         headers['X-Keep-Desired-Replication'] = str(copies)
1040         roots_map = {}
1041         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1042                                backoff_start=2)
1043         done = 0
1044         for tries_left in loop:
1045             try:
1046                 sorted_roots = self.map_new_services(
1047                     roots_map, locator,
1048                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1049             except Exception as error:
1050                 loop.save_result(error)
1051                 continue
1052
1053             thread_limiter = KeepClient.ThreadLimiter(
1054                 copies - done, self.max_replicas_per_service)
1055             threads = []
1056             for service_root, ks in [(root, roots_map[root])
1057                                      for root in sorted_roots]:
1058                 if ks.finished():
1059                     continue
1060                 t = KeepClient.KeepWriterThread(
1061                     ks,
1062                     data=data,
1063                     data_hash=data_hash,
1064                     service_root=service_root,
1065                     thread_limiter=thread_limiter,
1066                     timeout=self.current_timeout(num_retries-tries_left),
1067                     thread_sequence=len(threads))
1068                 t.start()
1069                 threads.append(t)
1070             for t in threads:
1071                 t.join()
1072             done += thread_limiter.done()
1073             loop.save_result((done >= copies, len(threads)))
1074
1075         if loop.success():
1076             return thread_limiter.response()
1077         if not roots_map:
1078             raise arvados.errors.KeepWriteError(
1079                 "failed to write {}: no Keep services available ({})".format(
1080                     data_hash, loop.last_result()))
1081         else:
1082             service_errors = ((key, roots_map[key].last_result()['error'])
1083                               for key in sorted_roots
1084                               if roots_map[key].last_result()['error'])
1085             raise arvados.errors.KeepWriteError(
1086                 "failed to write {} (wanted {} copies but wrote {})".format(
1087                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1088
1089     def local_store_put(self, data, copies=1, num_retries=None):
1090         """A stub for put().
1091
1092         This method is used in place of the real put() method when
1093         using local storage (see constructor's local_store argument).
1094
1095         copies and num_retries arguments are ignored: they are here
1096         only for the sake of offering the same call signature as
1097         put().
1098
1099         Data stored this way can be retrieved via local_store_get().
1100         """
1101         md5 = hashlib.md5(data).hexdigest()
1102         locator = '%s+%d' % (md5, len(data))
1103         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1104             f.write(data)
1105         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1106                   os.path.join(self.local_store, md5))
1107         return locator
1108
1109     def local_store_get(self, loc_s, num_retries=None):
1110         """Companion to local_store_put()."""
1111         try:
1112             locator = KeepLocator(loc_s)
1113         except ValueError:
1114             raise arvados.errors.NotFoundError(
1115                 "Invalid data locator: '%s'" % loc_s)
1116         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1117             return ''
1118         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1119             return f.read()
1120
1121     def is_cached(self, locator):
1122         return self.block_cache.reserve_cache(expect_hash)