8937: add head request to python keep client.
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import threading
13 import timer
14
15 import arvados
16 import arvados.config as config
17 import arvados.errors
18 import arvados.retry as retry
19 import arvados.util
20
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
23
24
25 class KeepLocator(object):
26     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28
29     def __init__(self, locator_str):
30         self.hints = []
31         self._perm_sig = None
32         self._perm_expiry = None
33         pieces = iter(locator_str.split('+'))
34         self.md5sum = next(pieces)
35         try:
36             self.size = int(next(pieces))
37         except StopIteration:
38             self.size = None
39         for hint in pieces:
40             if self.HINT_RE.match(hint) is None:
41                 raise ValueError("invalid hint format: {}".format(hint))
42             elif hint.startswith('A'):
43                 self.parse_permission_hint(hint)
44             else:
45                 self.hints.append(hint)
46
47     def __str__(self):
48         return '+'.join(
49             str(s) for s in [self.md5sum, self.size,
50                              self.permission_hint()] + self.hints
51             if s is not None)
52
53     def stripped(self):
54         if self.size is not None:
55             return "%s+%i" % (self.md5sum, self.size)
56         else:
57             return self.md5sum
58
59     def _make_hex_prop(name, length):
60         # Build and return a new property with the given name that
61         # must be a hex string of the given length.
62         data_name = '_{}'.format(name)
63         def getter(self):
64             return getattr(self, data_name)
65         def setter(self, hex_str):
66             if not arvados.util.is_hex(hex_str, length):
67                 raise ValueError("{} is not a {}-digit hex string: {}".
68                                  format(name, length, hex_str))
69             setattr(self, data_name, hex_str)
70         return property(getter, setter)
71
72     md5sum = _make_hex_prop('md5sum', 32)
73     perm_sig = _make_hex_prop('perm_sig', 40)
74
75     @property
76     def perm_expiry(self):
77         return self._perm_expiry
78
79     @perm_expiry.setter
80     def perm_expiry(self, value):
81         if not arvados.util.is_hex(value, 1, 8):
82             raise ValueError(
83                 "permission timestamp must be a hex Unix timestamp: {}".
84                 format(value))
85         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86
87     def permission_hint(self):
88         data = [self.perm_sig, self.perm_expiry]
89         if None in data:
90             return None
91         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92         return "A{}@{:08x}".format(*data)
93
94     def parse_permission_hint(self, s):
95         try:
96             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97         except IndexError:
98             raise ValueError("bad permission hint {}".format(s))
99
100     def permission_expired(self, as_of_dt=None):
101         if self.perm_expiry is None:
102             return False
103         elif as_of_dt is None:
104             as_of_dt = datetime.datetime.now()
105         return self.perm_expiry <= as_of_dt
106
107
108 class Keep(object):
109     """Simple interface to a global KeepClient object.
110
111     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
112     own API client.  The global KeepClient will build an API client from the
113     current Arvados configuration, which may not match the one you built.
114     """
115     _last_key = None
116
117     @classmethod
118     def global_client_object(cls):
119         global global_client_object
120         # Previously, KeepClient would change its behavior at runtime based
121         # on these configuration settings.  We simulate that behavior here
122         # by checking the values and returning a new KeepClient if any of
123         # them have changed.
124         key = (config.get('ARVADOS_API_HOST'),
125                config.get('ARVADOS_API_TOKEN'),
126                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127                config.get('ARVADOS_KEEP_PROXY'),
128                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129                os.environ.get('KEEP_LOCAL_STORE'))
130         if (global_client_object is None) or (cls._last_key != key):
131             global_client_object = KeepClient()
132             cls._last_key = key
133         return global_client_object
134
135     @staticmethod
136     def get(locator, **kwargs):
137         return Keep.global_client_object().get(locator, **kwargs)
138
139     @staticmethod
140     def put(data, **kwargs):
141         return Keep.global_client_object().put(data, **kwargs)
142
143 class KeepBlockCache(object):
144     # Default RAM cache is 256MiB
145     def __init__(self, cache_max=(256 * 1024 * 1024)):
146         self.cache_max = cache_max
147         self._cache = []
148         self._cache_lock = threading.Lock()
149
150     class CacheSlot(object):
151         __slots__ = ("locator", "ready", "content")
152
153         def __init__(self, locator):
154             self.locator = locator
155             self.ready = threading.Event()
156             self.content = None
157
158         def get(self):
159             self.ready.wait()
160             return self.content
161
162         def set(self, value):
163             self.content = value
164             self.ready.set()
165
166         def size(self):
167             if self.content is None:
168                 return 0
169             else:
170                 return len(self.content)
171
172     def cap_cache(self):
173         '''Cap the cache size to self.cache_max'''
174         with self._cache_lock:
175             # Select all slots except those where ready.is_set() and content is
176             # None (that means there was an error reading the block).
177             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178             sm = sum([slot.size() for slot in self._cache])
179             while len(self._cache) > 0 and sm > self.cache_max:
180                 for i in xrange(len(self._cache)-1, -1, -1):
181                     if self._cache[i].ready.is_set():
182                         del self._cache[i]
183                         break
184                 sm = sum([slot.size() for slot in self._cache])
185
186     def _get(self, locator):
187         # Test if the locator is already in the cache
188         for i in xrange(0, len(self._cache)):
189             if self._cache[i].locator == locator:
190                 n = self._cache[i]
191                 if i != 0:
192                     # move it to the front
193                     del self._cache[i]
194                     self._cache.insert(0, n)
195                 return n
196         return None
197
198     def get(self, locator):
199         with self._cache_lock:
200             return self._get(locator)
201
202     def reserve_cache(self, locator, reserve=True):
203         '''Reserve a cache slot for the specified locator,
204         or return the existing slot.'''
205         with self._cache_lock:
206             n = self._get(locator)
207             if n:
208                 return n, False
209             elif reserve == True:
210                 # Add a new cache slot for the locator
211                 n = KeepBlockCache.CacheSlot(locator)
212                 self._cache.insert(0, n)
213                 return n, True
214             else:
215                 return None, False
216
217 class Counter(object):
218     def __init__(self, v=0):
219         self._lk = threading.Lock()
220         self._val = v
221
222     def add(self, v):
223         with self._lk:
224             self._val += v
225
226     def get(self):
227         with self._lk:
228             return self._val
229
230
231 class KeepClient(object):
232
233     # Default Keep server connection timeout:  2 seconds
234     # Default Keep server read timeout:       256 seconds
235     # Default Keep server bandwidth minimum:  32768 bytes per second
236     # Default Keep proxy connection timeout:  20 seconds
237     # Default Keep proxy read timeout:        256 seconds
238     # Default Keep proxy bandwidth minimum:   32768 bytes per second
239     DEFAULT_TIMEOUT = (2, 256, 32768)
240     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
241
242     class ThreadLimiter(object):
243         """Limit the number of threads writing to Keep at once.
244
245         This ensures that only a number of writer threads that could
246         potentially achieve the desired replication level run at once.
247         Once the desired replication level is achieved, queued threads
248         are instructed not to run.
249
250         Should be used in a "with" block.
251         """
252         def __init__(self, want_copies, max_service_replicas):
253             self._started = 0
254             self._want_copies = want_copies
255             self._done = 0
256             self._response = None
257             self._start_lock = threading.Condition()
258             if (not max_service_replicas) or (max_service_replicas >= want_copies):
259                 max_threads = 1
260             else:
261                 max_threads = math.ceil(float(want_copies) / max_service_replicas)
262             _logger.debug("Limiter max threads is %d", max_threads)
263             self._todo_lock = threading.Semaphore(max_threads)
264             self._done_lock = threading.Lock()
265             self._local = threading.local()
266
267         def __enter__(self):
268             self._start_lock.acquire()
269             if getattr(self._local, 'sequence', None) is not None:
270                 # If the calling thread has used set_sequence(N), then
271                 # we wait here until N other threads have started.
272                 while self._started < self._local.sequence:
273                     self._start_lock.wait()
274             self._todo_lock.acquire()
275             self._started += 1
276             self._start_lock.notifyAll()
277             self._start_lock.release()
278             return self
279
280         def __exit__(self, type, value, traceback):
281             self._todo_lock.release()
282
283         def set_sequence(self, sequence):
284             self._local.sequence = sequence
285
286         def shall_i_proceed(self):
287             """
288             Return true if the current thread should write to Keep.
289             Return false otherwise.
290             """
291             with self._done_lock:
292                 return (self._done < self._want_copies)
293
294         def save_response(self, response_body, replicas_stored):
295             """
296             Records a response body (a locator, possibly signed) returned by
297             the Keep server, and the number of replicas it stored.
298             """
299             with self._done_lock:
300                 self._done += replicas_stored
301                 self._response = response_body
302
303         def response(self):
304             """Return the body from the response to a PUT request."""
305             with self._done_lock:
306                 return self._response
307
308         def done(self):
309             """Return the total number of replicas successfully stored."""
310             with self._done_lock:
311                 return self._done
312
313     class KeepService(object):
314         """Make requests to a single Keep service, and track results.
315
316         A KeepService is intended to last long enough to perform one
317         transaction (GET or PUT) against one Keep service. This can
318         involve calling either get() or put() multiple times in order
319         to retry after transient failures. However, calling both get()
320         and put() on a single instance -- or using the same instance
321         to access two different Keep services -- will not produce
322         sensible behavior.
323         """
324
325         HTTP_ERRORS = (
326             socket.error,
327             ssl.SSLError,
328             arvados.errors.HttpError,
329         )
330
331         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
332                      upload_counter=None,
333                      download_counter=None, **headers):
334             self.root = root
335             self._user_agent_pool = user_agent_pool
336             self._result = {'error': None}
337             self._usable = True
338             self._session = None
339             self.get_headers = {'Accept': 'application/octet-stream'}
340             self.get_headers.update(headers)
341             self.put_headers = headers
342             self.upload_counter = upload_counter
343             self.download_counter = download_counter
344
345         def usable(self):
346             """Is it worth attempting a request?"""
347             return self._usable
348
349         def finished(self):
350             """Did the request succeed or encounter permanent failure?"""
351             return self._result['error'] == False or not self._usable
352
353         def last_result(self):
354             return self._result
355
356         def _get_user_agent(self):
357             try:
358                 return self._user_agent_pool.get(False)
359             except Queue.Empty:
360                 return pycurl.Curl()
361
362         def _put_user_agent(self, ua):
363             try:
364                 ua.reset()
365                 self._user_agent_pool.put(ua, False)
366             except:
367                 ua.close()
368
369         @staticmethod
370         def _socket_open(family, socktype, protocol, address=None):
371             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
372             s = socket.socket(family, socktype, protocol)
373             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
374             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
375             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
376             return s
377
378         def get(self, locator, method="GET", timeout=None):
379             # locator is a KeepLocator object.
380             url = self.root + str(locator)
381             _logger.debug("Request: %s %s", method, url)
382             curl = self._get_user_agent()
383             ok = None
384             try:
385                 with timer.Timer() as t:
386                     self._headers = {}
387                     response_body = cStringIO.StringIO()
388                     curl.setopt(pycurl.NOSIGNAL, 1)
389                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
390                     curl.setopt(pycurl.URL, url.encode('utf-8'))
391                     curl.setopt(pycurl.HTTPHEADER, [
392                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
393                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
394                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
395                     if method == "HEAD":
396                         curl.setopt(pycurl.NOBODY, True)
397                     self._setcurltimeouts(curl, timeout)
398
399                     try:
400                         curl.perform()
401                     except Exception as e:
402                         raise arvados.errors.HttpError(0, str(e))
403                     self._result = {
404                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
405                         'body': response_body.getvalue(),
406                         'headers': self._headers,
407                         'error': False,
408                     }
409
410                 ok = retry.check_http_response_success(self._result['status_code'])
411                 if not ok:
412                     self._result['error'] = arvados.errors.HttpError(
413                         self._result['status_code'],
414                         self._headers.get('x-status-line', 'Error'))
415             except self.HTTP_ERRORS as e:
416                 self._result = {
417                     'error': e,
418                 }
419             self._usable = ok != False
420             if self._result.get('status_code', None):
421                 # The client worked well enough to get an HTTP status
422                 # code, so presumably any problems are just on the
423                 # server side and it's OK to reuse the client.
424                 self._put_user_agent(curl)
425             else:
426                 # Don't return this client to the pool, in case it's
427                 # broken.
428                 curl.close()
429             if not ok:
430                 _logger.debug("Request fail: GET %s => %s: %s",
431                               url, type(self._result['error']), str(self._result['error']))
432                 return None
433             if method == "HEAD":
434                 _logger.info("HEAD %s: %s bytes",
435                          self._result['status_code'],
436                          self._headers.get('content-length'))
437                 content_len = self._headers.get('content-length')
438                 if content_len is None:
439                     content_len = self._result['body']
440                 return str(content_len)
441
442             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
443                          self._result['status_code'],
444                          len(self._result['body']),
445                          t.msecs,
446                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
447
448             if self.download_counter:
449                 self.download_counter.add(len(self._result['body']))
450             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
451             if resp_md5 != locator.md5sum:
452                 _logger.warning("Checksum fail: md5(%s) = %s",
453                                 url, resp_md5)
454                 self._result['error'] = arvados.errors.HttpError(
455                     0, 'Checksum fail')
456                 return None
457             return self._result['body']
458
459         def put(self, hash_s, body, timeout=None):
460             url = self.root + hash_s
461             _logger.debug("Request: PUT %s", url)
462             curl = self._get_user_agent()
463             ok = None
464             try:
465                 with timer.Timer() as t:
466                     self._headers = {}
467                     body_reader = cStringIO.StringIO(body)
468                     response_body = cStringIO.StringIO()
469                     curl.setopt(pycurl.NOSIGNAL, 1)
470                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
471                     curl.setopt(pycurl.URL, url.encode('utf-8'))
472                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
473                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
474                     # response) instead of sending the request body immediately.
475                     # This allows the server to reject the request if the request
476                     # is invalid or the server is read-only, without waiting for
477                     # the client to send the entire block.
478                     curl.setopt(pycurl.UPLOAD, True)
479                     curl.setopt(pycurl.INFILESIZE, len(body))
480                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
481                     curl.setopt(pycurl.HTTPHEADER, [
482                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
483                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
484                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
485                     self._setcurltimeouts(curl, timeout)
486                     try:
487                         curl.perform()
488                     except Exception as e:
489                         raise arvados.errors.HttpError(0, str(e))
490                     self._result = {
491                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
492                         'body': response_body.getvalue(),
493                         'headers': self._headers,
494                         'error': False,
495                     }
496                 ok = retry.check_http_response_success(self._result['status_code'])
497                 if not ok:
498                     self._result['error'] = arvados.errors.HttpError(
499                         self._result['status_code'],
500                         self._headers.get('x-status-line', 'Error'))
501             except self.HTTP_ERRORS as e:
502                 self._result = {
503                     'error': e,
504                 }
505             self._usable = ok != False # still usable if ok is True or None
506             if self._result.get('status_code', None):
507                 # Client is functional. See comment in get().
508                 self._put_user_agent(curl)
509             else:
510                 curl.close()
511             if not ok:
512                 _logger.debug("Request fail: PUT %s => %s: %s",
513                               url, type(self._result['error']), str(self._result['error']))
514                 return False
515             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
516                          self._result['status_code'],
517                          len(body),
518                          t.msecs,
519                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
520             if self.upload_counter:
521                 self.upload_counter.add(len(body))
522             return True
523
524         def _setcurltimeouts(self, curl, timeouts):
525             if not timeouts:
526                 return
527             elif isinstance(timeouts, tuple):
528                 if len(timeouts) == 2:
529                     conn_t, xfer_t = timeouts
530                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
531                 else:
532                     conn_t, xfer_t, bandwidth_bps = timeouts
533             else:
534                 conn_t, xfer_t = (timeouts, timeouts)
535                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
536             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
537             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
538             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
539
540         def _headerfunction(self, header_line):
541             header_line = header_line.decode('iso-8859-1')
542             if ':' in header_line:
543                 name, value = header_line.split(':', 1)
544                 name = name.strip().lower()
545                 value = value.strip()
546             elif self._headers:
547                 name = self._lastheadername
548                 value = self._headers[name] + ' ' + header_line.strip()
549             elif header_line.startswith('HTTP/'):
550                 name = 'x-status-line'
551                 value = header_line
552             else:
553                 _logger.error("Unexpected header line: %s", header_line)
554                 return
555             self._lastheadername = name
556             self._headers[name] = value
557             # Returning None implies all bytes were written
558
559
560     class KeepWriterThread(threading.Thread):
561         """
562         Write a blob of data to the given Keep server. On success, call
563         save_response() of the given ThreadLimiter to save the returned
564         locator.
565         """
566         def __init__(self, keep_service, **kwargs):
567             super(KeepClient.KeepWriterThread, self).__init__()
568             self.service = keep_service
569             self.args = kwargs
570             self._success = False
571
572         def success(self):
573             return self._success
574
575         def run(self):
576             limiter = self.args['thread_limiter']
577             sequence = self.args['thread_sequence']
578             if sequence is not None:
579                 limiter.set_sequence(sequence)
580             with limiter:
581                 if not limiter.shall_i_proceed():
582                     # My turn arrived, but the job has been done without
583                     # me.
584                     return
585                 self.run_with_limiter(limiter)
586
587         def run_with_limiter(self, limiter):
588             if self.service.finished():
589                 return
590             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
591                           str(threading.current_thread()),
592                           self.args['data_hash'],
593                           len(self.args['data']),
594                           self.args['service_root'])
595             self._success = bool(self.service.put(
596                 self.args['data_hash'],
597                 self.args['data'],
598                 timeout=self.args.get('timeout', None)))
599             result = self.service.last_result()
600             if self._success:
601                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
602                               str(threading.current_thread()),
603                               self.args['data_hash'],
604                               len(self.args['data']),
605                               self.args['service_root'])
606                 # Tick the 'done' counter for the number of replica
607                 # reported stored by the server, for the case that
608                 # we're talking to a proxy or other backend that
609                 # stores to multiple copies for us.
610                 try:
611                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
612                 except (KeyError, ValueError):
613                     replicas_stored = 1
614                 limiter.save_response(result['body'].strip(), replicas_stored)
615             elif result.get('status_code', None):
616                 _logger.debug("Request fail: PUT %s => %s %s",
617                               self.args['data_hash'],
618                               result['status_code'],
619                               result['body'])
620
621
622     def __init__(self, api_client=None, proxy=None,
623                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
624                  api_token=None, local_store=None, block_cache=None,
625                  num_retries=0, session=None):
626         """Initialize a new KeepClient.
627
628         Arguments:
629         :api_client:
630           The API client to use to find Keep services.  If not
631           provided, KeepClient will build one from available Arvados
632           configuration.
633
634         :proxy:
635           If specified, this KeepClient will send requests to this Keep
636           proxy.  Otherwise, KeepClient will fall back to the setting of the
637           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
638           KeepClient does not use a proxy, pass in an empty string.
639
640         :timeout:
641           The initial timeout (in seconds) for HTTP requests to Keep
642           non-proxy servers.  A tuple of three floats is interpreted as
643           (connection_timeout, read_timeout, minimum_bandwidth). A connection
644           will be aborted if the average traffic rate falls below
645           minimum_bandwidth bytes per second over an interval of read_timeout
646           seconds. Because timeouts are often a result of transient server
647           load, the actual connection timeout will be increased by a factor
648           of two on each retry.
649           Default: (2, 256, 32768).
650
651         :proxy_timeout:
652           The initial timeout (in seconds) for HTTP requests to
653           Keep proxies. A tuple of three floats is interpreted as
654           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
655           described above for adjusting connection timeouts on retry also
656           applies.
657           Default: (20, 256, 32768).
658
659         :api_token:
660           If you're not using an API client, but only talking
661           directly to a Keep proxy, this parameter specifies an API token
662           to authenticate Keep requests.  It is an error to specify both
663           api_client and api_token.  If you specify neither, KeepClient
664           will use one available from the Arvados configuration.
665
666         :local_store:
667           If specified, this KeepClient will bypass Keep
668           services, and save data to the named directory.  If unspecified,
669           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
670           environment variable.  If you want to ensure KeepClient does not
671           use local storage, pass in an empty string.  This is primarily
672           intended to mock a server for testing.
673
674         :num_retries:
675           The default number of times to retry failed requests.
676           This will be used as the default num_retries value when get() and
677           put() are called.  Default 0.
678         """
679         self.lock = threading.Lock()
680         if proxy is None:
681             proxy = config.get('ARVADOS_KEEP_PROXY')
682         if api_token is None:
683             if api_client is None:
684                 api_token = config.get('ARVADOS_API_TOKEN')
685             else:
686                 api_token = api_client.api_token
687         elif api_client is not None:
688             raise ValueError(
689                 "can't build KeepClient with both API client and token")
690         if local_store is None:
691             local_store = os.environ.get('KEEP_LOCAL_STORE')
692
693         self.block_cache = block_cache if block_cache else KeepBlockCache()
694         self.timeout = timeout
695         self.proxy_timeout = proxy_timeout
696         self._user_agent_pool = Queue.LifoQueue()
697         self.upload_counter = Counter()
698         self.download_counter = Counter()
699         self.put_counter = Counter()
700         self.get_counter = Counter()
701         self.hits_counter = Counter()
702         self.misses_counter = Counter()
703
704         if local_store:
705             self.local_store = local_store
706             self.get = self.local_store_get
707             self.put = self.local_store_put
708         else:
709             self.num_retries = num_retries
710             self.max_replicas_per_service = None
711             if proxy:
712                 if not proxy.endswith('/'):
713                     proxy += '/'
714                 self.api_token = api_token
715                 self._gateway_services = {}
716                 self._keep_services = [{
717                     'uuid': 'proxy',
718                     'service_type': 'proxy',
719                     '_service_root': proxy,
720                     }]
721                 self._writable_services = self._keep_services
722                 self.using_proxy = True
723                 self._static_services_list = True
724             else:
725                 # It's important to avoid instantiating an API client
726                 # unless we actually need one, for testing's sake.
727                 if api_client is None:
728                     api_client = arvados.api('v1')
729                 self.api_client = api_client
730                 self.api_token = api_client.api_token
731                 self._gateway_services = {}
732                 self._keep_services = None
733                 self._writable_services = None
734                 self.using_proxy = None
735                 self._static_services_list = False
736
737     def current_timeout(self, attempt_number):
738         """Return the appropriate timeout to use for this client.
739
740         The proxy timeout setting if the backend service is currently a proxy,
741         the regular timeout setting otherwise.  The `attempt_number` indicates
742         how many times the operation has been tried already (starting from 0
743         for the first try), and scales the connection timeout portion of the
744         return value accordingly.
745
746         """
747         # TODO(twp): the timeout should be a property of a
748         # KeepService, not a KeepClient. See #4488.
749         t = self.proxy_timeout if self.using_proxy else self.timeout
750         if len(t) == 2:
751             return (t[0] * (1 << attempt_number), t[1])
752         else:
753             return (t[0] * (1 << attempt_number), t[1], t[2])
754     def _any_nondisk_services(self, service_list):
755         return any(ks.get('service_type', 'disk') != 'disk'
756                    for ks in service_list)
757
758     def build_services_list(self, force_rebuild=False):
759         if (self._static_services_list or
760               (self._keep_services and not force_rebuild)):
761             return
762         with self.lock:
763             try:
764                 keep_services = self.api_client.keep_services().accessible()
765             except Exception:  # API server predates Keep services.
766                 keep_services = self.api_client.keep_disks().list()
767
768             # Gateway services are only used when specified by UUID,
769             # so there's nothing to gain by filtering them by
770             # service_type.
771             self._gateway_services = {ks['uuid']: ks for ks in
772                                       keep_services.execute()['items']}
773             if not self._gateway_services:
774                 raise arvados.errors.NoKeepServersError()
775
776             # Precompute the base URI for each service.
777             for r in self._gateway_services.itervalues():
778                 host = r['service_host']
779                 if not host.startswith('[') and host.find(':') >= 0:
780                     # IPv6 URIs must be formatted like http://[::1]:80/...
781                     host = '[' + host + ']'
782                 r['_service_root'] = "{}://{}:{:d}/".format(
783                     'https' if r['service_ssl_flag'] else 'http',
784                     host,
785                     r['service_port'])
786
787             _logger.debug(str(self._gateway_services))
788             self._keep_services = [
789                 ks for ks in self._gateway_services.itervalues()
790                 if not ks.get('service_type', '').startswith('gateway:')]
791             self._writable_services = [ks for ks in self._keep_services
792                                        if not ks.get('read_only')]
793
794             # For disk type services, max_replicas_per_service is 1
795             # It is unknown (unlimited) for other service types.
796             if self._any_nondisk_services(self._writable_services):
797                 self.max_replicas_per_service = None
798             else:
799                 self.max_replicas_per_service = 1
800
801     def _service_weight(self, data_hash, service_uuid):
802         """Compute the weight of a Keep service endpoint for a data
803         block with a known hash.
804
805         The weight is md5(h + u) where u is the last 15 characters of
806         the service endpoint's UUID.
807         """
808         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
809
810     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
811         """Return an array of Keep service endpoints, in the order in
812         which they should be probed when reading or writing data with
813         the given hash+hints.
814         """
815         self.build_services_list(force_rebuild)
816
817         sorted_roots = []
818         # Use the services indicated by the given +K@... remote
819         # service hints, if any are present and can be resolved to a
820         # URI.
821         for hint in locator.hints:
822             if hint.startswith('K@'):
823                 if len(hint) == 7:
824                     sorted_roots.append(
825                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
826                 elif len(hint) == 29:
827                     svc = self._gateway_services.get(hint[2:])
828                     if svc:
829                         sorted_roots.append(svc['_service_root'])
830
831         # Sort the available local services by weight (heaviest first)
832         # for this locator, and return their service_roots (base URIs)
833         # in that order.
834         use_services = self._keep_services
835         if need_writable:
836             use_services = self._writable_services
837         self.using_proxy = self._any_nondisk_services(use_services)
838         sorted_roots.extend([
839             svc['_service_root'] for svc in sorted(
840                 use_services,
841                 reverse=True,
842                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
843         _logger.debug("{}: {}".format(locator, sorted_roots))
844         return sorted_roots
845
846     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
847         # roots_map is a dictionary, mapping Keep service root strings
848         # to KeepService objects.  Poll for Keep services, and add any
849         # new ones to roots_map.  Return the current list of local
850         # root strings.
851         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
852         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
853         for root in local_roots:
854             if root not in roots_map:
855                 roots_map[root] = self.KeepService(
856                     root, self._user_agent_pool,
857                     upload_counter=self.upload_counter,
858                     download_counter=self.download_counter,
859                     **headers)
860         return local_roots
861
862     @staticmethod
863     def _check_loop_result(result):
864         # KeepClient RetryLoops should save results as a 2-tuple: the
865         # actual result of the request, and the number of servers available
866         # to receive the request this round.
867         # This method returns True if there's a real result, False if
868         # there are no more servers available, otherwise None.
869         if isinstance(result, Exception):
870             return None
871         result, tried_server_count = result
872         if (result is not None) and (result is not False):
873             return True
874         elif tried_server_count < 1:
875             _logger.info("No more Keep services to try; giving up")
876             return False
877         else:
878             return None
879
880     def get_from_cache(self, loc):
881         """Fetch a block only if is in the cache, otherwise return None."""
882         slot = self.block_cache.get(loc)
883         if slot is not None and slot.ready.is_set():
884             return slot.get()
885         else:
886             return None
887
888     @retry.retry_method
889     def head(self, loc_s, num_retries=None):
890         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
891
892     @retry.retry_method
893     def get(self, loc_s, num_retries=None):
894         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
895
896     def _get_or_head(self, loc_s, method="GET", num_retries=None):
897         """Get data from Keep.
898
899         This method fetches one or more blocks of data from Keep.  It
900         sends a request each Keep service registered with the API
901         server (or the proxy provided when this client was
902         instantiated), then each service named in location hints, in
903         sequence.  As soon as one service provides the data, it's
904         returned.
905
906         Arguments:
907         * loc_s: A string of one or more comma-separated locators to fetch.
908           This method returns the concatenation of these blocks.
909         * num_retries: The number of times to retry GET requests to
910           *each* Keep server if it returns temporary failures, with
911           exponential backoff.  Note that, in each loop, the method may try
912           to fetch data from every available Keep service, along with any
913           that are named in location hints in the locator.  The default value
914           is set when the KeepClient is initialized.
915         """
916         if ',' in loc_s:
917             return ''.join(self.get(x) for x in loc_s.split(','))
918
919         self.get_counter.add(1)
920
921         locator = KeepLocator(loc_s)
922         slot, first = self.block_cache.reserve_cache(locator.md5sum, True if method == "GET" else False)
923         if not first and slot is not None:
924             self.hits_counter.add(1)
925             v = slot.get()
926             if method == "HEAD":
927                 return str(len(v))
928             else:
929                 return v
930
931         self.misses_counter.add(1)
932
933         # If the locator has hints specifying a prefix (indicating a
934         # remote keepproxy) or the UUID of a local gateway service,
935         # read data from the indicated service(s) instead of the usual
936         # list of local disk services.
937         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
938                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
939         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
940                            for hint in locator.hints if (
941                                    hint.startswith('K@') and
942                                    len(hint) == 29 and
943                                    self._gateway_services.get(hint[2:])
944                                    )])
945         # Map root URLs to their KeepService objects.
946         roots_map = {
947             root: self.KeepService(root, self._user_agent_pool,
948                                    upload_counter=self.upload_counter,
949                                    download_counter=self.download_counter)
950             for root in hint_roots
951         }
952
953         # See #3147 for a discussion of the loop implementation.  Highlights:
954         # * Refresh the list of Keep services after each failure, in case
955         #   it's being updated.
956         # * Retry until we succeed, we're out of retries, or every available
957         #   service has returned permanent failure.
958         sorted_roots = []
959         roots_map = {}
960         blob = None
961         loop = retry.RetryLoop(num_retries, self._check_loop_result,
962                                backoff_start=2)
963         for tries_left in loop:
964             try:
965                 sorted_roots = self.map_new_services(
966                     roots_map, locator,
967                     force_rebuild=(tries_left < num_retries),
968                     need_writable=False)
969             except Exception as error:
970                 loop.save_result(error)
971                 continue
972
973             # Query KeepService objects that haven't returned
974             # permanent failure, in our specified shuffle order.
975             services_to_try = [roots_map[root]
976                                for root in sorted_roots
977                                if roots_map[root].usable()]
978             for keep_service in services_to_try:
979                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
980                 if blob is not None:
981                     break
982             loop.save_result((blob, len(services_to_try)))
983
984         # Always cache the result, then return it if we succeeded.
985         if method == "GET":
986             slot.set(blob)
987             self.block_cache.cap_cache()
988         if loop.success():
989             return blob
990
991         # Q: Including 403 is necessary for the Keep tests to continue
992         # passing, but maybe they should expect KeepReadError instead?
993         not_founds = sum(1 for key in sorted_roots
994                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
995         service_errors = ((key, roots_map[key].last_result()['error'])
996                           for key in sorted_roots)
997         if not roots_map:
998             raise arvados.errors.KeepReadError(
999                 "failed to read {}: no Keep services available ({})".format(
1000                     loc_s, loop.last_result()))
1001         elif not_founds == len(sorted_roots):
1002             raise arvados.errors.NotFoundError(
1003                 "{} not found".format(loc_s), service_errors)
1004         else:
1005             raise arvados.errors.KeepReadError(
1006                 "failed to read {}".format(loc_s), service_errors, label="service")
1007
1008     @retry.retry_method
1009     def put(self, data, copies=2, num_retries=None):
1010         """Save data in Keep.
1011
1012         This method will get a list of Keep services from the API server, and
1013         send the data to each one simultaneously in a new thread.  Once the
1014         uploads are finished, if enough copies are saved, this method returns
1015         the most recent HTTP response body.  If requests fail to upload
1016         enough copies, this method raises KeepWriteError.
1017
1018         Arguments:
1019         * data: The string of data to upload.
1020         * copies: The number of copies that the user requires be saved.
1021           Default 2.
1022         * num_retries: The number of times to retry PUT requests to
1023           *each* Keep server if it returns temporary failures, with
1024           exponential backoff.  The default value is set when the
1025           KeepClient is initialized.
1026         """
1027
1028         if isinstance(data, unicode):
1029             data = data.encode("ascii")
1030         elif not isinstance(data, str):
1031             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1032
1033         self.put_counter.add(1)
1034
1035         data_hash = hashlib.md5(data).hexdigest()
1036         loc_s = data_hash + '+' + str(len(data))
1037         if copies < 1:
1038             return loc_s
1039         locator = KeepLocator(loc_s)
1040
1041         headers = {}
1042         # Tell the proxy how many copies we want it to store
1043         headers['X-Keep-Desired-Replication'] = str(copies)
1044         roots_map = {}
1045         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1046                                backoff_start=2)
1047         done = 0
1048         for tries_left in loop:
1049             try:
1050                 sorted_roots = self.map_new_services(
1051                     roots_map, locator,
1052                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1053             except Exception as error:
1054                 loop.save_result(error)
1055                 continue
1056
1057             thread_limiter = KeepClient.ThreadLimiter(
1058                 copies - done, self.max_replicas_per_service)
1059             threads = []
1060             for service_root, ks in [(root, roots_map[root])
1061                                      for root in sorted_roots]:
1062                 if ks.finished():
1063                     continue
1064                 t = KeepClient.KeepWriterThread(
1065                     ks,
1066                     data=data,
1067                     data_hash=data_hash,
1068                     service_root=service_root,
1069                     thread_limiter=thread_limiter,
1070                     timeout=self.current_timeout(num_retries-tries_left),
1071                     thread_sequence=len(threads))
1072                 t.start()
1073                 threads.append(t)
1074             for t in threads:
1075                 t.join()
1076             done += thread_limiter.done()
1077             loop.save_result((done >= copies, len(threads)))
1078
1079         if loop.success():
1080             return thread_limiter.response()
1081         if not roots_map:
1082             raise arvados.errors.KeepWriteError(
1083                 "failed to write {}: no Keep services available ({})".format(
1084                     data_hash, loop.last_result()))
1085         else:
1086             service_errors = ((key, roots_map[key].last_result()['error'])
1087                               for key in sorted_roots
1088                               if roots_map[key].last_result()['error'])
1089             raise arvados.errors.KeepWriteError(
1090                 "failed to write {} (wanted {} copies but wrote {})".format(
1091                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1092
1093     def local_store_put(self, data, copies=1, num_retries=None):
1094         """A stub for put().
1095
1096         This method is used in place of the real put() method when
1097         using local storage (see constructor's local_store argument).
1098
1099         copies and num_retries arguments are ignored: they are here
1100         only for the sake of offering the same call signature as
1101         put().
1102
1103         Data stored this way can be retrieved via local_store_get().
1104         """
1105         md5 = hashlib.md5(data).hexdigest()
1106         locator = '%s+%d' % (md5, len(data))
1107         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1108             f.write(data)
1109         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1110                   os.path.join(self.local_store, md5))
1111         return locator
1112
1113     def local_store_get(self, loc_s, num_retries=None):
1114         """Companion to local_store_put()."""
1115         try:
1116             locator = KeepLocator(loc_s)
1117         except ValueError:
1118             raise arvados.errors.NotFoundError(
1119                 "Invalid data locator: '%s'" % loc_s)
1120         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1121             return ''
1122         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1123             return f.read()
1124
1125     def is_cached(self, locator):
1126         return self.block_cache.reserve_cache(expect_hash)