Merge branch '9570-cwl-v1.0' closes #9570
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import threading
13 import timer
14
15 import arvados
16 import arvados.config as config
17 import arvados.errors
18 import arvados.retry as retry
19 import arvados.util
20
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
23
24
25 class KeepLocator(object):
26     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28
29     def __init__(self, locator_str):
30         self.hints = []
31         self._perm_sig = None
32         self._perm_expiry = None
33         pieces = iter(locator_str.split('+'))
34         self.md5sum = next(pieces)
35         try:
36             self.size = int(next(pieces))
37         except StopIteration:
38             self.size = None
39         for hint in pieces:
40             if self.HINT_RE.match(hint) is None:
41                 raise ValueError("invalid hint format: {}".format(hint))
42             elif hint.startswith('A'):
43                 self.parse_permission_hint(hint)
44             else:
45                 self.hints.append(hint)
46
47     def __str__(self):
48         return '+'.join(
49             str(s) for s in [self.md5sum, self.size,
50                              self.permission_hint()] + self.hints
51             if s is not None)
52
53     def stripped(self):
54         if self.size is not None:
55             return "%s+%i" % (self.md5sum, self.size)
56         else:
57             return self.md5sum
58
59     def _make_hex_prop(name, length):
60         # Build and return a new property with the given name that
61         # must be a hex string of the given length.
62         data_name = '_{}'.format(name)
63         def getter(self):
64             return getattr(self, data_name)
65         def setter(self, hex_str):
66             if not arvados.util.is_hex(hex_str, length):
67                 raise ValueError("{} is not a {}-digit hex string: {}".
68                                  format(name, length, hex_str))
69             setattr(self, data_name, hex_str)
70         return property(getter, setter)
71
72     md5sum = _make_hex_prop('md5sum', 32)
73     perm_sig = _make_hex_prop('perm_sig', 40)
74
75     @property
76     def perm_expiry(self):
77         return self._perm_expiry
78
79     @perm_expiry.setter
80     def perm_expiry(self, value):
81         if not arvados.util.is_hex(value, 1, 8):
82             raise ValueError(
83                 "permission timestamp must be a hex Unix timestamp: {}".
84                 format(value))
85         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86
87     def permission_hint(self):
88         data = [self.perm_sig, self.perm_expiry]
89         if None in data:
90             return None
91         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92         return "A{}@{:08x}".format(*data)
93
94     def parse_permission_hint(self, s):
95         try:
96             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97         except IndexError:
98             raise ValueError("bad permission hint {}".format(s))
99
100     def permission_expired(self, as_of_dt=None):
101         if self.perm_expiry is None:
102             return False
103         elif as_of_dt is None:
104             as_of_dt = datetime.datetime.now()
105         return self.perm_expiry <= as_of_dt
106
107
108 class Keep(object):
109     """Simple interface to a global KeepClient object.
110
111     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
112     own API client.  The global KeepClient will build an API client from the
113     current Arvados configuration, which may not match the one you built.
114     """
115     _last_key = None
116
117     @classmethod
118     def global_client_object(cls):
119         global global_client_object
120         # Previously, KeepClient would change its behavior at runtime based
121         # on these configuration settings.  We simulate that behavior here
122         # by checking the values and returning a new KeepClient if any of
123         # them have changed.
124         key = (config.get('ARVADOS_API_HOST'),
125                config.get('ARVADOS_API_TOKEN'),
126                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127                config.get('ARVADOS_KEEP_PROXY'),
128                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129                os.environ.get('KEEP_LOCAL_STORE'))
130         if (global_client_object is None) or (cls._last_key != key):
131             global_client_object = KeepClient()
132             cls._last_key = key
133         return global_client_object
134
135     @staticmethod
136     def get(locator, **kwargs):
137         return Keep.global_client_object().get(locator, **kwargs)
138
139     @staticmethod
140     def put(data, **kwargs):
141         return Keep.global_client_object().put(data, **kwargs)
142
143 class KeepBlockCache(object):
144     # Default RAM cache is 256MiB
145     def __init__(self, cache_max=(256 * 1024 * 1024)):
146         self.cache_max = cache_max
147         self._cache = []
148         self._cache_lock = threading.Lock()
149
150     class CacheSlot(object):
151         __slots__ = ("locator", "ready", "content")
152
153         def __init__(self, locator):
154             self.locator = locator
155             self.ready = threading.Event()
156             self.content = None
157
158         def get(self):
159             self.ready.wait()
160             return self.content
161
162         def set(self, value):
163             self.content = value
164             self.ready.set()
165
166         def size(self):
167             if self.content is None:
168                 return 0
169             else:
170                 return len(self.content)
171
172     def cap_cache(self):
173         '''Cap the cache size to self.cache_max'''
174         with self._cache_lock:
175             # Select all slots except those where ready.is_set() and content is
176             # None (that means there was an error reading the block).
177             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178             sm = sum([slot.size() for slot in self._cache])
179             while len(self._cache) > 0 and sm > self.cache_max:
180                 for i in xrange(len(self._cache)-1, -1, -1):
181                     if self._cache[i].ready.is_set():
182                         del self._cache[i]
183                         break
184                 sm = sum([slot.size() for slot in self._cache])
185
186     def _get(self, locator):
187         # Test if the locator is already in the cache
188         for i in xrange(0, len(self._cache)):
189             if self._cache[i].locator == locator:
190                 n = self._cache[i]
191                 if i != 0:
192                     # move it to the front
193                     del self._cache[i]
194                     self._cache.insert(0, n)
195                 return n
196         return None
197
198     def get(self, locator):
199         with self._cache_lock:
200             return self._get(locator)
201
202     def reserve_cache(self, locator):
203         '''Reserve a cache slot for the specified locator,
204         or return the existing slot.'''
205         with self._cache_lock:
206             n = self._get(locator)
207             if n:
208                 return n, False
209             else:
210                 # Add a new cache slot for the locator
211                 n = KeepBlockCache.CacheSlot(locator)
212                 self._cache.insert(0, n)
213                 return n, True
214
215 class Counter(object):
216     def __init__(self, v=0):
217         self._lk = threading.Lock()
218         self._val = v
219
220     def add(self, v):
221         with self._lk:
222             self._val += v
223
224     def get(self):
225         with self._lk:
226             return self._val
227
228
229 class KeepClient(object):
230
231     # Default Keep server connection timeout:  2 seconds
232     # Default Keep server read timeout:       256 seconds
233     # Default Keep server bandwidth minimum:  32768 bytes per second
234     # Default Keep proxy connection timeout:  20 seconds
235     # Default Keep proxy read timeout:        256 seconds
236     # Default Keep proxy bandwidth minimum:   32768 bytes per second
237     DEFAULT_TIMEOUT = (2, 256, 32768)
238     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
239
240
241     class KeepService(object):
242         """Make requests to a single Keep service, and track results.
243
244         A KeepService is intended to last long enough to perform one
245         transaction (GET or PUT) against one Keep service. This can
246         involve calling either get() or put() multiple times in order
247         to retry after transient failures. However, calling both get()
248         and put() on a single instance -- or using the same instance
249         to access two different Keep services -- will not produce
250         sensible behavior.
251         """
252
253         HTTP_ERRORS = (
254             socket.error,
255             ssl.SSLError,
256             arvados.errors.HttpError,
257         )
258
259         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
260                      upload_counter=None,
261                      download_counter=None, **headers):
262             self.root = root
263             self._user_agent_pool = user_agent_pool
264             self._result = {'error': None}
265             self._usable = True
266             self._session = None
267             self.get_headers = {'Accept': 'application/octet-stream'}
268             self.get_headers.update(headers)
269             self.put_headers = headers
270             self.upload_counter = upload_counter
271             self.download_counter = download_counter
272
273         def usable(self):
274             """Is it worth attempting a request?"""
275             return self._usable
276
277         def finished(self):
278             """Did the request succeed or encounter permanent failure?"""
279             return self._result['error'] == False or not self._usable
280
281         def last_result(self):
282             return self._result
283
284         def _get_user_agent(self):
285             try:
286                 return self._user_agent_pool.get(False)
287             except Queue.Empty:
288                 return pycurl.Curl()
289
290         def _put_user_agent(self, ua):
291             try:
292                 ua.reset()
293                 self._user_agent_pool.put(ua, False)
294             except:
295                 ua.close()
296
297         @staticmethod
298         def _socket_open(family, socktype, protocol, address=None):
299             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
300             s = socket.socket(family, socktype, protocol)
301             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
302             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
303             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
304             return s
305
306         def get(self, locator, method="GET", timeout=None):
307             # locator is a KeepLocator object.
308             url = self.root + str(locator)
309             _logger.debug("Request: %s %s", method, url)
310             curl = self._get_user_agent()
311             ok = None
312             try:
313                 with timer.Timer() as t:
314                     self._headers = {}
315                     response_body = cStringIO.StringIO()
316                     curl.setopt(pycurl.NOSIGNAL, 1)
317                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
318                     curl.setopt(pycurl.URL, url.encode('utf-8'))
319                     curl.setopt(pycurl.HTTPHEADER, [
320                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
321                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
322                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
323                     if method == "HEAD":
324                         curl.setopt(pycurl.NOBODY, True)
325                     self._setcurltimeouts(curl, timeout)
326
327                     try:
328                         curl.perform()
329                     except Exception as e:
330                         raise arvados.errors.HttpError(0, str(e))
331                     self._result = {
332                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
333                         'body': response_body.getvalue(),
334                         'headers': self._headers,
335                         'error': False,
336                     }
337
338                 ok = retry.check_http_response_success(self._result['status_code'])
339                 if not ok:
340                     self._result['error'] = arvados.errors.HttpError(
341                         self._result['status_code'],
342                         self._headers.get('x-status-line', 'Error'))
343             except self.HTTP_ERRORS as e:
344                 self._result = {
345                     'error': e,
346                 }
347             self._usable = ok != False
348             if self._result.get('status_code', None):
349                 # The client worked well enough to get an HTTP status
350                 # code, so presumably any problems are just on the
351                 # server side and it's OK to reuse the client.
352                 self._put_user_agent(curl)
353             else:
354                 # Don't return this client to the pool, in case it's
355                 # broken.
356                 curl.close()
357             if not ok:
358                 _logger.debug("Request fail: GET %s => %s: %s",
359                               url, type(self._result['error']), str(self._result['error']))
360                 return None
361             if method == "HEAD":
362                 _logger.info("HEAD %s: %s bytes",
363                          self._result['status_code'],
364                          self._result.get('content-length'))
365                 return True
366
367             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
368                          self._result['status_code'],
369                          len(self._result['body']),
370                          t.msecs,
371                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
372
373             if self.download_counter:
374                 self.download_counter.add(len(self._result['body']))
375             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
376             if resp_md5 != locator.md5sum:
377                 _logger.warning("Checksum fail: md5(%s) = %s",
378                                 url, resp_md5)
379                 self._result['error'] = arvados.errors.HttpError(
380                     0, 'Checksum fail')
381                 return None
382             return self._result['body']
383
384         def put(self, hash_s, body, timeout=None):
385             url = self.root + hash_s
386             _logger.debug("Request: PUT %s", url)
387             curl = self._get_user_agent()
388             ok = None
389             try:
390                 with timer.Timer() as t:
391                     self._headers = {}
392                     body_reader = cStringIO.StringIO(body)
393                     response_body = cStringIO.StringIO()
394                     curl.setopt(pycurl.NOSIGNAL, 1)
395                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
396                     curl.setopt(pycurl.URL, url.encode('utf-8'))
397                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
398                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
399                     # response) instead of sending the request body immediately.
400                     # This allows the server to reject the request if the request
401                     # is invalid or the server is read-only, without waiting for
402                     # the client to send the entire block.
403                     curl.setopt(pycurl.UPLOAD, True)
404                     curl.setopt(pycurl.INFILESIZE, len(body))
405                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
406                     curl.setopt(pycurl.HTTPHEADER, [
407                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
408                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
409                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
410                     self._setcurltimeouts(curl, timeout)
411                     try:
412                         curl.perform()
413                     except Exception as e:
414                         raise arvados.errors.HttpError(0, str(e))
415                     self._result = {
416                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
417                         'body': response_body.getvalue(),
418                         'headers': self._headers,
419                         'error': False,
420                     }
421                 ok = retry.check_http_response_success(self._result['status_code'])
422                 if not ok:
423                     self._result['error'] = arvados.errors.HttpError(
424                         self._result['status_code'],
425                         self._headers.get('x-status-line', 'Error'))
426             except self.HTTP_ERRORS as e:
427                 self._result = {
428                     'error': e,
429                 }
430             self._usable = ok != False # still usable if ok is True or None
431             if self._result.get('status_code', None):
432                 # Client is functional. See comment in get().
433                 self._put_user_agent(curl)
434             else:
435                 curl.close()
436             if not ok:
437                 _logger.debug("Request fail: PUT %s => %s: %s",
438                               url, type(self._result['error']), str(self._result['error']))
439                 return False
440             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
441                          self._result['status_code'],
442                          len(body),
443                          t.msecs,
444                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
445             if self.upload_counter:
446                 self.upload_counter.add(len(body))
447             return True
448
449         def _setcurltimeouts(self, curl, timeouts):
450             if not timeouts:
451                 return
452             elif isinstance(timeouts, tuple):
453                 if len(timeouts) == 2:
454                     conn_t, xfer_t = timeouts
455                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
456                 else:
457                     conn_t, xfer_t, bandwidth_bps = timeouts
458             else:
459                 conn_t, xfer_t = (timeouts, timeouts)
460                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
461             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
462             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
463             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
464
465         def _headerfunction(self, header_line):
466             header_line = header_line.decode('iso-8859-1')
467             if ':' in header_line:
468                 name, value = header_line.split(':', 1)
469                 name = name.strip().lower()
470                 value = value.strip()
471             elif self._headers:
472                 name = self._lastheadername
473                 value = self._headers[name] + ' ' + header_line.strip()
474             elif header_line.startswith('HTTP/'):
475                 name = 'x-status-line'
476                 value = header_line
477             else:
478                 _logger.error("Unexpected header line: %s", header_line)
479                 return
480             self._lastheadername = name
481             self._headers[name] = value
482             # Returning None implies all bytes were written
483     
484
485     class KeepWriterQueue(Queue.Queue):
486         def __init__(self, copies):
487             Queue.Queue.__init__(self) # Old-style superclass
488             self.wanted_copies = copies
489             self.successful_copies = 0
490             self.response = None
491             self.successful_copies_lock = threading.Lock()
492             self.pending_tries = copies
493             self.pending_tries_notification = threading.Condition()
494         
495         def write_success(self, response, replicas_nr):
496             with self.successful_copies_lock:
497                 self.successful_copies += replicas_nr
498                 self.response = response
499         
500         def write_fail(self, ks, status_code):
501             with self.pending_tries_notification:
502                 self.pending_tries += 1
503                 self.pending_tries_notification.notify()
504         
505         def pending_copies(self):
506             with self.successful_copies_lock:
507                 return self.wanted_copies - self.successful_copies
508     
509     
510     class KeepWriterThreadPool(object):
511         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
512             self.total_task_nr = 0
513             self.wanted_copies = copies
514             if (not max_service_replicas) or (max_service_replicas >= copies):
515                 num_threads = 1
516             else:
517                 num_threads = int(math.ceil(float(copies) / max_service_replicas))
518             _logger.debug("Pool max threads is %d", num_threads)
519             self.workers = []
520             self.queue = KeepClient.KeepWriterQueue(copies)
521             # Create workers
522             for _ in range(num_threads):
523                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
524                 self.workers.append(w)
525         
526         def add_task(self, ks, service_root):
527             self.queue.put((ks, service_root))
528             self.total_task_nr += 1
529         
530         def done(self):
531             return self.queue.successful_copies
532         
533         def join(self):
534             # Start workers
535             for worker in self.workers:
536                 worker.start()
537             # Wait for finished work
538             self.queue.join()
539             with self.queue.pending_tries_notification:
540                 self.queue.pending_tries_notification.notify_all()
541             for worker in self.workers:
542                 worker.join()
543         
544         def response(self):
545             return self.queue.response
546     
547     
548     class KeepWriterThread(threading.Thread):
549         def __init__(self, queue, data, data_hash, timeout=None):
550             super(KeepClient.KeepWriterThread, self).__init__()
551             self.timeout = timeout
552             self.queue = queue
553             self.data = data
554             self.data_hash = data_hash
555         
556         def run(self):
557             while not self.queue.empty():
558                 if self.queue.pending_copies() > 0:
559                     # Avoid overreplication, wait for some needed re-attempt
560                     with self.queue.pending_tries_notification:
561                         if self.queue.pending_tries <= 0:
562                             self.queue.pending_tries_notification.wait()
563                             continue # try again when awake
564                         self.queue.pending_tries -= 1
565
566                     # Get to work
567                     try:
568                         service, service_root = self.queue.get_nowait()
569                     except Queue.Empty:
570                         continue
571                     if service.finished():
572                         self.queue.task_done()
573                         continue
574                     success = bool(service.put(self.data_hash,
575                                                 self.data,
576                                                 timeout=self.timeout))
577                     result = service.last_result()
578                     if success:
579                         _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
580                                       str(threading.current_thread()),
581                                       self.data_hash,
582                                       len(self.data),
583                                       service_root)
584                         try:
585                             replicas_stored = int(result['headers']['x-keep-replicas-stored'])
586                         except (KeyError, ValueError):
587                             replicas_stored = 1
588                         
589                         self.queue.write_success(result['body'].strip(), replicas_stored)
590                     else:
591                         if result.get('status_code', None):
592                             _logger.debug("Request fail: PUT %s => %s %s",
593                                           self.data_hash,
594                                           result['status_code'],
595                                           result['body'])
596                         self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
597                     # Mark as done so the queue can be join()ed
598                     self.queue.task_done()
599                 else:
600                     # Remove the task from the queue anyways
601                     try:
602                         self.queue.get_nowait()
603                         # Mark as done so the queue can be join()ed
604                         self.queue.task_done()
605                     except Queue.Empty:
606                         continue
607
608
609     def __init__(self, api_client=None, proxy=None,
610                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
611                  api_token=None, local_store=None, block_cache=None,
612                  num_retries=0, session=None):
613         """Initialize a new KeepClient.
614
615         Arguments:
616         :api_client:
617           The API client to use to find Keep services.  If not
618           provided, KeepClient will build one from available Arvados
619           configuration.
620
621         :proxy:
622           If specified, this KeepClient will send requests to this Keep
623           proxy.  Otherwise, KeepClient will fall back to the setting of the
624           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
625           KeepClient does not use a proxy, pass in an empty string.
626
627         :timeout:
628           The initial timeout (in seconds) for HTTP requests to Keep
629           non-proxy servers.  A tuple of three floats is interpreted as
630           (connection_timeout, read_timeout, minimum_bandwidth). A connection
631           will be aborted if the average traffic rate falls below
632           minimum_bandwidth bytes per second over an interval of read_timeout
633           seconds. Because timeouts are often a result of transient server
634           load, the actual connection timeout will be increased by a factor
635           of two on each retry.
636           Default: (2, 256, 32768).
637
638         :proxy_timeout:
639           The initial timeout (in seconds) for HTTP requests to
640           Keep proxies. A tuple of three floats is interpreted as
641           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
642           described above for adjusting connection timeouts on retry also
643           applies.
644           Default: (20, 256, 32768).
645
646         :api_token:
647           If you're not using an API client, but only talking
648           directly to a Keep proxy, this parameter specifies an API token
649           to authenticate Keep requests.  It is an error to specify both
650           api_client and api_token.  If you specify neither, KeepClient
651           will use one available from the Arvados configuration.
652
653         :local_store:
654           If specified, this KeepClient will bypass Keep
655           services, and save data to the named directory.  If unspecified,
656           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
657           environment variable.  If you want to ensure KeepClient does not
658           use local storage, pass in an empty string.  This is primarily
659           intended to mock a server for testing.
660
661         :num_retries:
662           The default number of times to retry failed requests.
663           This will be used as the default num_retries value when get() and
664           put() are called.  Default 0.
665         """
666         self.lock = threading.Lock()
667         if proxy is None:
668             proxy = config.get('ARVADOS_KEEP_PROXY')
669         if api_token is None:
670             if api_client is None:
671                 api_token = config.get('ARVADOS_API_TOKEN')
672             else:
673                 api_token = api_client.api_token
674         elif api_client is not None:
675             raise ValueError(
676                 "can't build KeepClient with both API client and token")
677         if local_store is None:
678             local_store = os.environ.get('KEEP_LOCAL_STORE')
679
680         self.block_cache = block_cache if block_cache else KeepBlockCache()
681         self.timeout = timeout
682         self.proxy_timeout = proxy_timeout
683         self._user_agent_pool = Queue.LifoQueue()
684         self.upload_counter = Counter()
685         self.download_counter = Counter()
686         self.put_counter = Counter()
687         self.get_counter = Counter()
688         self.hits_counter = Counter()
689         self.misses_counter = Counter()
690
691         if local_store:
692             self.local_store = local_store
693             self.get = self.local_store_get
694             self.put = self.local_store_put
695         else:
696             self.num_retries = num_retries
697             self.max_replicas_per_service = None
698             if proxy:
699                 if not proxy.endswith('/'):
700                     proxy += '/'
701                 self.api_token = api_token
702                 self._gateway_services = {}
703                 self._keep_services = [{
704                     'uuid': 'proxy',
705                     'service_type': 'proxy',
706                     '_service_root': proxy,
707                     }]
708                 self._writable_services = self._keep_services
709                 self.using_proxy = True
710                 self._static_services_list = True
711             else:
712                 # It's important to avoid instantiating an API client
713                 # unless we actually need one, for testing's sake.
714                 if api_client is None:
715                     api_client = arvados.api('v1')
716                 self.api_client = api_client
717                 self.api_token = api_client.api_token
718                 self._gateway_services = {}
719                 self._keep_services = None
720                 self._writable_services = None
721                 self.using_proxy = None
722                 self._static_services_list = False
723
724     def current_timeout(self, attempt_number):
725         """Return the appropriate timeout to use for this client.
726
727         The proxy timeout setting if the backend service is currently a proxy,
728         the regular timeout setting otherwise.  The `attempt_number` indicates
729         how many times the operation has been tried already (starting from 0
730         for the first try), and scales the connection timeout portion of the
731         return value accordingly.
732
733         """
734         # TODO(twp): the timeout should be a property of a
735         # KeepService, not a KeepClient. See #4488.
736         t = self.proxy_timeout if self.using_proxy else self.timeout
737         if len(t) == 2:
738             return (t[0] * (1 << attempt_number), t[1])
739         else:
740             return (t[0] * (1 << attempt_number), t[1], t[2])
741     def _any_nondisk_services(self, service_list):
742         return any(ks.get('service_type', 'disk') != 'disk'
743                    for ks in service_list)
744
745     def build_services_list(self, force_rebuild=False):
746         if (self._static_services_list or
747               (self._keep_services and not force_rebuild)):
748             return
749         with self.lock:
750             try:
751                 keep_services = self.api_client.keep_services().accessible()
752             except Exception:  # API server predates Keep services.
753                 keep_services = self.api_client.keep_disks().list()
754
755             # Gateway services are only used when specified by UUID,
756             # so there's nothing to gain by filtering them by
757             # service_type.
758             self._gateway_services = {ks['uuid']: ks for ks in
759                                       keep_services.execute()['items']}
760             if not self._gateway_services:
761                 raise arvados.errors.NoKeepServersError()
762
763             # Precompute the base URI for each service.
764             for r in self._gateway_services.itervalues():
765                 host = r['service_host']
766                 if not host.startswith('[') and host.find(':') >= 0:
767                     # IPv6 URIs must be formatted like http://[::1]:80/...
768                     host = '[' + host + ']'
769                 r['_service_root'] = "{}://{}:{:d}/".format(
770                     'https' if r['service_ssl_flag'] else 'http',
771                     host,
772                     r['service_port'])
773
774             _logger.debug(str(self._gateway_services))
775             self._keep_services = [
776                 ks for ks in self._gateway_services.itervalues()
777                 if not ks.get('service_type', '').startswith('gateway:')]
778             self._writable_services = [ks for ks in self._keep_services
779                                        if not ks.get('read_only')]
780
781             # For disk type services, max_replicas_per_service is 1
782             # It is unknown (unlimited) for other service types.
783             if self._any_nondisk_services(self._writable_services):
784                 self.max_replicas_per_service = None
785             else:
786                 self.max_replicas_per_service = 1
787
788     def _service_weight(self, data_hash, service_uuid):
789         """Compute the weight of a Keep service endpoint for a data
790         block with a known hash.
791
792         The weight is md5(h + u) where u is the last 15 characters of
793         the service endpoint's UUID.
794         """
795         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
796
797     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
798         """Return an array of Keep service endpoints, in the order in
799         which they should be probed when reading or writing data with
800         the given hash+hints.
801         """
802         self.build_services_list(force_rebuild)
803
804         sorted_roots = []
805         # Use the services indicated by the given +K@... remote
806         # service hints, if any are present and can be resolved to a
807         # URI.
808         for hint in locator.hints:
809             if hint.startswith('K@'):
810                 if len(hint) == 7:
811                     sorted_roots.append(
812                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
813                 elif len(hint) == 29:
814                     svc = self._gateway_services.get(hint[2:])
815                     if svc:
816                         sorted_roots.append(svc['_service_root'])
817
818         # Sort the available local services by weight (heaviest first)
819         # for this locator, and return their service_roots (base URIs)
820         # in that order.
821         use_services = self._keep_services
822         if need_writable:
823             use_services = self._writable_services
824         self.using_proxy = self._any_nondisk_services(use_services)
825         sorted_roots.extend([
826             svc['_service_root'] for svc in sorted(
827                 use_services,
828                 reverse=True,
829                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
830         _logger.debug("{}: {}".format(locator, sorted_roots))
831         return sorted_roots
832
833     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
834         # roots_map is a dictionary, mapping Keep service root strings
835         # to KeepService objects.  Poll for Keep services, and add any
836         # new ones to roots_map.  Return the current list of local
837         # root strings.
838         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
839         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
840         for root in local_roots:
841             if root not in roots_map:
842                 roots_map[root] = self.KeepService(
843                     root, self._user_agent_pool,
844                     upload_counter=self.upload_counter,
845                     download_counter=self.download_counter,
846                     **headers)
847         return local_roots
848
849     @staticmethod
850     def _check_loop_result(result):
851         # KeepClient RetryLoops should save results as a 2-tuple: the
852         # actual result of the request, and the number of servers available
853         # to receive the request this round.
854         # This method returns True if there's a real result, False if
855         # there are no more servers available, otherwise None.
856         if isinstance(result, Exception):
857             return None
858         result, tried_server_count = result
859         if (result is not None) and (result is not False):
860             return True
861         elif tried_server_count < 1:
862             _logger.info("No more Keep services to try; giving up")
863             return False
864         else:
865             return None
866
867     def get_from_cache(self, loc):
868         """Fetch a block only if is in the cache, otherwise return None."""
869         slot = self.block_cache.get(loc)
870         if slot is not None and slot.ready.is_set():
871             return slot.get()
872         else:
873             return None
874
875     @retry.retry_method
876     def head(self, loc_s, num_retries=None):
877         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
878
879     @retry.retry_method
880     def get(self, loc_s, num_retries=None):
881         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
882
883     def _get_or_head(self, loc_s, method="GET", num_retries=None):
884         """Get data from Keep.
885
886         This method fetches one or more blocks of data from Keep.  It
887         sends a request each Keep service registered with the API
888         server (or the proxy provided when this client was
889         instantiated), then each service named in location hints, in
890         sequence.  As soon as one service provides the data, it's
891         returned.
892
893         Arguments:
894         * loc_s: A string of one or more comma-separated locators to fetch.
895           This method returns the concatenation of these blocks.
896         * num_retries: The number of times to retry GET requests to
897           *each* Keep server if it returns temporary failures, with
898           exponential backoff.  Note that, in each loop, the method may try
899           to fetch data from every available Keep service, along with any
900           that are named in location hints in the locator.  The default value
901           is set when the KeepClient is initialized.
902         """
903         if ',' in loc_s:
904             return ''.join(self.get(x) for x in loc_s.split(','))
905
906         self.get_counter.add(1)
907
908         locator = KeepLocator(loc_s)
909         if method == "GET":
910             slot, first = self.block_cache.reserve_cache(locator.md5sum)
911             if not first:
912                 self.hits_counter.add(1)
913                 v = slot.get()
914                 return v
915
916         self.misses_counter.add(1)
917
918         # If the locator has hints specifying a prefix (indicating a
919         # remote keepproxy) or the UUID of a local gateway service,
920         # read data from the indicated service(s) instead of the usual
921         # list of local disk services.
922         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
923                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
924         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
925                            for hint in locator.hints if (
926                                    hint.startswith('K@') and
927                                    len(hint) == 29 and
928                                    self._gateway_services.get(hint[2:])
929                                    )])
930         # Map root URLs to their KeepService objects.
931         roots_map = {
932             root: self.KeepService(root, self._user_agent_pool,
933                                    upload_counter=self.upload_counter,
934                                    download_counter=self.download_counter)
935             for root in hint_roots
936         }
937
938         # See #3147 for a discussion of the loop implementation.  Highlights:
939         # * Refresh the list of Keep services after each failure, in case
940         #   it's being updated.
941         # * Retry until we succeed, we're out of retries, or every available
942         #   service has returned permanent failure.
943         sorted_roots = []
944         roots_map = {}
945         blob = None
946         loop = retry.RetryLoop(num_retries, self._check_loop_result,
947                                backoff_start=2)
948         for tries_left in loop:
949             try:
950                 sorted_roots = self.map_new_services(
951                     roots_map, locator,
952                     force_rebuild=(tries_left < num_retries),
953                     need_writable=False)
954             except Exception as error:
955                 loop.save_result(error)
956                 continue
957
958             # Query KeepService objects that haven't returned
959             # permanent failure, in our specified shuffle order.
960             services_to_try = [roots_map[root]
961                                for root in sorted_roots
962                                if roots_map[root].usable()]
963             for keep_service in services_to_try:
964                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
965                 if blob is not None:
966                     break
967             loop.save_result((blob, len(services_to_try)))
968
969         # Always cache the result, then return it if we succeeded.
970         if method == "GET":
971             slot.set(blob)
972             self.block_cache.cap_cache()
973         if loop.success():
974             if method == "HEAD":
975                 return True
976             else:
977                 return blob
978
979         # Q: Including 403 is necessary for the Keep tests to continue
980         # passing, but maybe they should expect KeepReadError instead?
981         not_founds = sum(1 for key in sorted_roots
982                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
983         service_errors = ((key, roots_map[key].last_result()['error'])
984                           for key in sorted_roots)
985         if not roots_map:
986             raise arvados.errors.KeepReadError(
987                 "failed to read {}: no Keep services available ({})".format(
988                     loc_s, loop.last_result()))
989         elif not_founds == len(sorted_roots):
990             raise arvados.errors.NotFoundError(
991                 "{} not found".format(loc_s), service_errors)
992         else:
993             raise arvados.errors.KeepReadError(
994                 "failed to read {}".format(loc_s), service_errors, label="service")
995
996     @retry.retry_method
997     def put(self, data, copies=2, num_retries=None):
998         """Save data in Keep.
999
1000         This method will get a list of Keep services from the API server, and
1001         send the data to each one simultaneously in a new thread.  Once the
1002         uploads are finished, if enough copies are saved, this method returns
1003         the most recent HTTP response body.  If requests fail to upload
1004         enough copies, this method raises KeepWriteError.
1005
1006         Arguments:
1007         * data: The string of data to upload.
1008         * copies: The number of copies that the user requires be saved.
1009           Default 2.
1010         * num_retries: The number of times to retry PUT requests to
1011           *each* Keep server if it returns temporary failures, with
1012           exponential backoff.  The default value is set when the
1013           KeepClient is initialized.
1014         """
1015
1016         if isinstance(data, unicode):
1017             data = data.encode("ascii")
1018         elif not isinstance(data, str):
1019             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1020
1021         self.put_counter.add(1)
1022
1023         data_hash = hashlib.md5(data).hexdigest()
1024         loc_s = data_hash + '+' + str(len(data))
1025         if copies < 1:
1026             return loc_s
1027         locator = KeepLocator(loc_s)
1028
1029         headers = {}
1030         # Tell the proxy how many copies we want it to store
1031         headers['X-Keep-Desired-Replicas'] = str(copies)
1032         roots_map = {}
1033         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1034                                backoff_start=2)
1035         done = 0
1036         for tries_left in loop:
1037             try:
1038                 sorted_roots = self.map_new_services(
1039                     roots_map, locator,
1040                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1041             except Exception as error:
1042                 loop.save_result(error)
1043                 continue
1044
1045             writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
1046                                                         data_hash=data_hash,
1047                                                         copies=copies - done,
1048                                                         max_service_replicas=self.max_replicas_per_service,
1049                                                         timeout=self.current_timeout(num_retries - tries_left))
1050             for service_root, ks in [(root, roots_map[root])
1051                                      for root in sorted_roots]:
1052                 if ks.finished():
1053                     continue
1054                 writer_pool.add_task(ks, service_root)
1055             writer_pool.join()
1056             done += writer_pool.done()
1057             loop.save_result((done >= copies, writer_pool.total_task_nr))
1058
1059         if loop.success():
1060             return writer_pool.response()
1061         if not roots_map:
1062             raise arvados.errors.KeepWriteError(
1063                 "failed to write {}: no Keep services available ({})".format(
1064                     data_hash, loop.last_result()))
1065         else:
1066             service_errors = ((key, roots_map[key].last_result()['error'])
1067                               for key in sorted_roots
1068                               if roots_map[key].last_result()['error'])
1069             raise arvados.errors.KeepWriteError(
1070                 "failed to write {} (wanted {} copies but wrote {})".format(
1071                     data_hash, copies, writer_pool.done()), service_errors, label="service")
1072
1073     def local_store_put(self, data, copies=1, num_retries=None):
1074         """A stub for put().
1075
1076         This method is used in place of the real put() method when
1077         using local storage (see constructor's local_store argument).
1078
1079         copies and num_retries arguments are ignored: they are here
1080         only for the sake of offering the same call signature as
1081         put().
1082
1083         Data stored this way can be retrieved via local_store_get().
1084         """
1085         md5 = hashlib.md5(data).hexdigest()
1086         locator = '%s+%d' % (md5, len(data))
1087         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1088             f.write(data)
1089         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1090                   os.path.join(self.local_store, md5))
1091         return locator
1092
1093     def local_store_get(self, loc_s, num_retries=None):
1094         """Companion to local_store_put()."""
1095         try:
1096             locator = KeepLocator(loc_s)
1097         except ValueError:
1098             raise arvados.errors.NotFoundError(
1099                 "Invalid data locator: '%s'" % loc_s)
1100         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1101             return ''
1102         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1103             return f.read()
1104
1105     def is_cached(self, locator):
1106         return self.block_cache.reserve_cache(expect_hash)