Ready to start testing code
[arvados.git] / sdk / python / arvados / keep.py
1 import cStringIO
2 import datetime
3 import hashlib
4 import logging
5 import math
6 import os
7 import pycurl
8 import Queue
9 import re
10 import socket
11 import ssl
12 import threading
13 import timer
14
15 import arvados
16 import arvados.config as config
17 import arvados.errors
18 import arvados.retry as retry
19 import arvados.util
20
21 _logger = logging.getLogger('arvados.keep')
22 global_client_object = None
23
24
25 class KeepLocator(object):
26     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
27     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
28
29     def __init__(self, locator_str):
30         self.hints = []
31         self._perm_sig = None
32         self._perm_expiry = None
33         pieces = iter(locator_str.split('+'))
34         self.md5sum = next(pieces)
35         try:
36             self.size = int(next(pieces))
37         except StopIteration:
38             self.size = None
39         for hint in pieces:
40             if self.HINT_RE.match(hint) is None:
41                 raise ValueError("invalid hint format: {}".format(hint))
42             elif hint.startswith('A'):
43                 self.parse_permission_hint(hint)
44             else:
45                 self.hints.append(hint)
46
47     def __str__(self):
48         return '+'.join(
49             str(s) for s in [self.md5sum, self.size,
50                              self.permission_hint()] + self.hints
51             if s is not None)
52
53     def stripped(self):
54         if self.size is not None:
55             return "%s+%i" % (self.md5sum, self.size)
56         else:
57             return self.md5sum
58
59     def _make_hex_prop(name, length):
60         # Build and return a new property with the given name that
61         # must be a hex string of the given length.
62         data_name = '_{}'.format(name)
63         def getter(self):
64             return getattr(self, data_name)
65         def setter(self, hex_str):
66             if not arvados.util.is_hex(hex_str, length):
67                 raise ValueError("{} is not a {}-digit hex string: {}".
68                                  format(name, length, hex_str))
69             setattr(self, data_name, hex_str)
70         return property(getter, setter)
71
72     md5sum = _make_hex_prop('md5sum', 32)
73     perm_sig = _make_hex_prop('perm_sig', 40)
74
75     @property
76     def perm_expiry(self):
77         return self._perm_expiry
78
79     @perm_expiry.setter
80     def perm_expiry(self, value):
81         if not arvados.util.is_hex(value, 1, 8):
82             raise ValueError(
83                 "permission timestamp must be a hex Unix timestamp: {}".
84                 format(value))
85         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
86
87     def permission_hint(self):
88         data = [self.perm_sig, self.perm_expiry]
89         if None in data:
90             return None
91         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
92         return "A{}@{:08x}".format(*data)
93
94     def parse_permission_hint(self, s):
95         try:
96             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
97         except IndexError:
98             raise ValueError("bad permission hint {}".format(s))
99
100     def permission_expired(self, as_of_dt=None):
101         if self.perm_expiry is None:
102             return False
103         elif as_of_dt is None:
104             as_of_dt = datetime.datetime.now()
105         return self.perm_expiry <= as_of_dt
106
107
108 class Keep(object):
109     """Simple interface to a global KeepClient object.
110
111     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
112     own API client.  The global KeepClient will build an API client from the
113     current Arvados configuration, which may not match the one you built.
114     """
115     _last_key = None
116
117     @classmethod
118     def global_client_object(cls):
119         global global_client_object
120         # Previously, KeepClient would change its behavior at runtime based
121         # on these configuration settings.  We simulate that behavior here
122         # by checking the values and returning a new KeepClient if any of
123         # them have changed.
124         key = (config.get('ARVADOS_API_HOST'),
125                config.get('ARVADOS_API_TOKEN'),
126                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
127                config.get('ARVADOS_KEEP_PROXY'),
128                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
129                os.environ.get('KEEP_LOCAL_STORE'))
130         if (global_client_object is None) or (cls._last_key != key):
131             global_client_object = KeepClient()
132             cls._last_key = key
133         return global_client_object
134
135     @staticmethod
136     def get(locator, **kwargs):
137         return Keep.global_client_object().get(locator, **kwargs)
138
139     @staticmethod
140     def put(data, **kwargs):
141         return Keep.global_client_object().put(data, **kwargs)
142
143 class KeepBlockCache(object):
144     # Default RAM cache is 256MiB
145     def __init__(self, cache_max=(256 * 1024 * 1024)):
146         self.cache_max = cache_max
147         self._cache = []
148         self._cache_lock = threading.Lock()
149
150     class CacheSlot(object):
151         __slots__ = ("locator", "ready", "content")
152
153         def __init__(self, locator):
154             self.locator = locator
155             self.ready = threading.Event()
156             self.content = None
157
158         def get(self):
159             self.ready.wait()
160             return self.content
161
162         def set(self, value):
163             self.content = value
164             self.ready.set()
165
166         def size(self):
167             if self.content is None:
168                 return 0
169             else:
170                 return len(self.content)
171
172     def cap_cache(self):
173         '''Cap the cache size to self.cache_max'''
174         with self._cache_lock:
175             # Select all slots except those where ready.is_set() and content is
176             # None (that means there was an error reading the block).
177             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
178             sm = sum([slot.size() for slot in self._cache])
179             while len(self._cache) > 0 and sm > self.cache_max:
180                 for i in xrange(len(self._cache)-1, -1, -1):
181                     if self._cache[i].ready.is_set():
182                         del self._cache[i]
183                         break
184                 sm = sum([slot.size() for slot in self._cache])
185
186     def _get(self, locator):
187         # Test if the locator is already in the cache
188         for i in xrange(0, len(self._cache)):
189             if self._cache[i].locator == locator:
190                 n = self._cache[i]
191                 if i != 0:
192                     # move it to the front
193                     del self._cache[i]
194                     self._cache.insert(0, n)
195                 return n
196         return None
197
198     def get(self, locator):
199         with self._cache_lock:
200             return self._get(locator)
201
202     def reserve_cache(self, locator):
203         '''Reserve a cache slot for the specified locator,
204         or return the existing slot.'''
205         with self._cache_lock:
206             n = self._get(locator)
207             if n:
208                 return n, False
209             else:
210                 # Add a new cache slot for the locator
211                 n = KeepBlockCache.CacheSlot(locator)
212                 self._cache.insert(0, n)
213                 return n, True
214
215 class Counter(object):
216     def __init__(self, v=0):
217         self._lk = threading.Lock()
218         self._val = v
219
220     def add(self, v):
221         with self._lk:
222             self._val += v
223
224     def get(self):
225         with self._lk:
226             return self._val
227
228
229 class KeepClient(object):
230
231     # Default Keep server connection timeout:  2 seconds
232     # Default Keep server read timeout:       256 seconds
233     # Default Keep server bandwidth minimum:  32768 bytes per second
234     # Default Keep proxy connection timeout:  20 seconds
235     # Default Keep proxy read timeout:        256 seconds
236     # Default Keep proxy bandwidth minimum:   32768 bytes per second
237     DEFAULT_TIMEOUT = (2, 256, 32768)
238     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
239
240     class ThreadLimiter(object):
241         """Limit the number of threads writing to Keep at once.
242
243         This ensures that only a number of writer threads that could
244         potentially achieve the desired replication level run at once.
245         Once the desired replication level is achieved, queued threads
246         are instructed not to run.
247
248         Should be used in a "with" block.
249         """
250         def __init__(self, want_copies, max_service_replicas):
251             self._started = 0
252             self._want_copies = want_copies
253             self._done = 0
254             self._thread_failures = 0
255             self._response = None
256             self._start_lock = threading.Condition()
257             if (not max_service_replicas) or (max_service_replicas >= want_copies):
258                 max_threads = 1
259             else:
260                 max_threads = math.ceil(float(want_copies) / max_service_replicas)
261             _logger.debug("Limiter max threads is %d", max_threads)
262             self._todo_lock = threading.Semaphore(max_threads)
263             self._done_lock = threading.Lock()
264             self._thread_failures_lock = threading.Lock()
265             self._local = threading.local()
266
267         def __enter__(self):
268             self._start_lock.acquire()
269             if getattr(self._local, 'sequence', None) is not None:
270                 # If the calling thread has used set_sequence(N), then
271                 # we wait here until N other threads have started.
272                 while self._started < self._local.sequence:
273                     self._start_lock.wait()
274             self._todo_lock.acquire()
275             self._started += 1
276             self._start_lock.notifyAll()
277             self._start_lock.release()
278             return self
279
280         def __exit__(self, type, value, traceback):
281             with self._thread_failures_lock:
282                 if self._thread_failures > 0:
283                     self._thread_failures -= 1
284                     self._todo_lock.release()
285
286             # If work is finished, release al pending threads
287             if not self.shall_i_proceed():
288                 self._todo_lock.release()
289
290         def set_sequence(self, sequence):
291             self._local.sequence = sequence
292
293         def shall_i_proceed(self):
294             """
295             Return true if the current thread should write to Keep.
296             Return false otherwise.
297             """
298             with self._done_lock:
299                 return (self._done < self._want_copies)
300
301         def save_response(self, response_body, replicas_stored):
302             """
303             Records a response body (a locator, possibly signed) returned by
304             the Keep server, and the number of replicas it stored.
305             """
306             if replicas_stored == 0:
307                 # Failure notification, should start a new thread to try to reach full replication
308                 with self._thread_failures_lock:
309                     self._thread_failures += 1
310             else:
311                 with self._done_lock:
312                     self._done += replicas_stored
313                     self._response = response_body
314
315         def response(self):
316             """Return the body from the response to a PUT request."""
317             with self._done_lock:
318                 return self._response
319
320         def done(self):
321             """Return the total number of replicas successfully stored."""
322             with self._done_lock:
323                 return self._done
324
325     class KeepService(object):
326         """Make requests to a single Keep service, and track results.
327
328         A KeepService is intended to last long enough to perform one
329         transaction (GET or PUT) against one Keep service. This can
330         involve calling either get() or put() multiple times in order
331         to retry after transient failures. However, calling both get()
332         and put() on a single instance -- or using the same instance
333         to access two different Keep services -- will not produce
334         sensible behavior.
335         """
336
337         HTTP_ERRORS = (
338             socket.error,
339             ssl.SSLError,
340             arvados.errors.HttpError,
341         )
342
343         def __init__(self, root, user_agent_pool=Queue.LifoQueue(),
344                      upload_counter=None,
345                      download_counter=None, **headers):
346             self.root = root
347             self._user_agent_pool = user_agent_pool
348             self._result = {'error': None}
349             self._usable = True
350             self._session = None
351             self.get_headers = {'Accept': 'application/octet-stream'}
352             self.get_headers.update(headers)
353             self.put_headers = headers
354             self.upload_counter = upload_counter
355             self.download_counter = download_counter
356
357         def usable(self):
358             """Is it worth attempting a request?"""
359             return self._usable
360
361         def finished(self):
362             """Did the request succeed or encounter permanent failure?"""
363             return self._result['error'] == False or not self._usable
364
365         def last_result(self):
366             return self._result
367
368         def _get_user_agent(self):
369             try:
370                 return self._user_agent_pool.get(False)
371             except Queue.Empty:
372                 return pycurl.Curl()
373
374         def _put_user_agent(self, ua):
375             try:
376                 ua.reset()
377                 self._user_agent_pool.put(ua, False)
378             except:
379                 ua.close()
380
381         @staticmethod
382         def _socket_open(family, socktype, protocol, address=None):
383             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
384             s = socket.socket(family, socktype, protocol)
385             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
386             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
387             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
388             return s
389
390         def get(self, locator, method="GET", timeout=None):
391             # locator is a KeepLocator object.
392             url = self.root + str(locator)
393             _logger.debug("Request: %s %s", method, url)
394             curl = self._get_user_agent()
395             ok = None
396             try:
397                 with timer.Timer() as t:
398                     self._headers = {}
399                     response_body = cStringIO.StringIO()
400                     curl.setopt(pycurl.NOSIGNAL, 1)
401                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
402                     curl.setopt(pycurl.URL, url.encode('utf-8'))
403                     curl.setopt(pycurl.HTTPHEADER, [
404                         '{}: {}'.format(k,v) for k,v in self.get_headers.iteritems()])
405                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
406                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
407                     if method == "HEAD":
408                         curl.setopt(pycurl.NOBODY, True)
409                     self._setcurltimeouts(curl, timeout)
410
411                     try:
412                         curl.perform()
413                     except Exception as e:
414                         raise arvados.errors.HttpError(0, str(e))
415                     self._result = {
416                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
417                         'body': response_body.getvalue(),
418                         'headers': self._headers,
419                         'error': False,
420                     }
421
422                 ok = retry.check_http_response_success(self._result['status_code'])
423                 if not ok:
424                     self._result['error'] = arvados.errors.HttpError(
425                         self._result['status_code'],
426                         self._headers.get('x-status-line', 'Error'))
427             except self.HTTP_ERRORS as e:
428                 self._result = {
429                     'error': e,
430                 }
431             self._usable = ok != False
432             if self._result.get('status_code', None):
433                 # The client worked well enough to get an HTTP status
434                 # code, so presumably any problems are just on the
435                 # server side and it's OK to reuse the client.
436                 self._put_user_agent(curl)
437             else:
438                 # Don't return this client to the pool, in case it's
439                 # broken.
440                 curl.close()
441             if not ok:
442                 _logger.debug("Request fail: GET %s => %s: %s",
443                               url, type(self._result['error']), str(self._result['error']))
444                 return None
445             if method == "HEAD":
446                 _logger.info("HEAD %s: %s bytes",
447                          self._result['status_code'],
448                          self._result.get('content-length'))
449                 return True
450
451             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
452                          self._result['status_code'],
453                          len(self._result['body']),
454                          t.msecs,
455                          (len(self._result['body'])/(1024.0*1024))/t.secs if t.secs > 0 else 0)
456
457             if self.download_counter:
458                 self.download_counter.add(len(self._result['body']))
459             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
460             if resp_md5 != locator.md5sum:
461                 _logger.warning("Checksum fail: md5(%s) = %s",
462                                 url, resp_md5)
463                 self._result['error'] = arvados.errors.HttpError(
464                     0, 'Checksum fail')
465                 return None
466             return self._result['body']
467
468         def put(self, hash_s, body, timeout=None):
469             url = self.root + hash_s
470             _logger.debug("Request: PUT %s", url)
471             curl = self._get_user_agent()
472             ok = None
473             try:
474                 with timer.Timer() as t:
475                     self._headers = {}
476                     body_reader = cStringIO.StringIO(body)
477                     response_body = cStringIO.StringIO()
478                     curl.setopt(pycurl.NOSIGNAL, 1)
479                     curl.setopt(pycurl.OPENSOCKETFUNCTION, self._socket_open)
480                     curl.setopt(pycurl.URL, url.encode('utf-8'))
481                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
482                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
483                     # response) instead of sending the request body immediately.
484                     # This allows the server to reject the request if the request
485                     # is invalid or the server is read-only, without waiting for
486                     # the client to send the entire block.
487                     curl.setopt(pycurl.UPLOAD, True)
488                     curl.setopt(pycurl.INFILESIZE, len(body))
489                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
490                     curl.setopt(pycurl.HTTPHEADER, [
491                         '{}: {}'.format(k,v) for k,v in self.put_headers.iteritems()])
492                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
493                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
494                     self._setcurltimeouts(curl, timeout)
495                     try:
496                         curl.perform()
497                     except Exception as e:
498                         raise arvados.errors.HttpError(0, str(e))
499                     self._result = {
500                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
501                         'body': response_body.getvalue(),
502                         'headers': self._headers,
503                         'error': False,
504                     }
505                 ok = retry.check_http_response_success(self._result['status_code'])
506                 if not ok:
507                     self._result['error'] = arvados.errors.HttpError(
508                         self._result['status_code'],
509                         self._headers.get('x-status-line', 'Error'))
510             except self.HTTP_ERRORS as e:
511                 self._result = {
512                     'error': e,
513                 }
514             self._usable = ok != False # still usable if ok is True or None
515             if self._result.get('status_code', None):
516                 # Client is functional. See comment in get().
517                 self._put_user_agent(curl)
518             else:
519                 curl.close()
520             if not ok:
521                 _logger.debug("Request fail: PUT %s => %s: %s",
522                               url, type(self._result['error']), str(self._result['error']))
523                 return False
524             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
525                          self._result['status_code'],
526                          len(body),
527                          t.msecs,
528                          (len(body)/(1024.0*1024))/t.secs if t.secs > 0 else 0)
529             if self.upload_counter:
530                 self.upload_counter.add(len(body))
531             return True
532
533         def _setcurltimeouts(self, curl, timeouts):
534             if not timeouts:
535                 return
536             elif isinstance(timeouts, tuple):
537                 if len(timeouts) == 2:
538                     conn_t, xfer_t = timeouts
539                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
540                 else:
541                     conn_t, xfer_t, bandwidth_bps = timeouts
542             else:
543                 conn_t, xfer_t = (timeouts, timeouts)
544                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
545             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
546             curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
547             curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
548
549         def _headerfunction(self, header_line):
550             header_line = header_line.decode('iso-8859-1')
551             if ':' in header_line:
552                 name, value = header_line.split(':', 1)
553                 name = name.strip().lower()
554                 value = value.strip()
555             elif self._headers:
556                 name = self._lastheadername
557                 value = self._headers[name] + ' ' + header_line.strip()
558             elif header_line.startswith('HTTP/'):
559                 name = 'x-status-line'
560                 value = header_line
561             else:
562                 _logger.error("Unexpected header line: %s", header_line)
563                 return
564             self._lastheadername = name
565             self._headers[name] = value
566             # Returning None implies all bytes were written
567     
568
569     class KeepWriterQueue(Queue.Queue):
570         def __init__(self, copies):
571             super(KeepClient.KeepWriterQueue, self).__init__()
572             self.wanted_copies = copies
573             self.successful_copies = 0
574             self.successful_copies_lock = threading.Lock()
575             self.retries = copies
576             self.retries_notification = threading.Condition()
577         
578         def write_success(self, replicas_nr):
579             with self.successful_copies_lock:
580                 self.successful_copies += replicas_nr
581         
582         def write_fail(self, ks, status_code):
583             with self.retries_notification:
584                 self.retries += 1
585                 self.retries_notification.notify()
586         
587         def pending_copies(self):
588             return self.wanted_copies - self.successful_copies
589     
590     
591     class KeepWriterThreadPool(object):
592         def __init__(self, data, data_hash, num_threads, copies=2):
593             self.wanted_copies = copies
594             self.workers = []
595             self.queue = KeepClient.KeepWriterQueue(copies)
596             # Start workers
597             for _ in range(num_threads):
598                 self.workers.append(KeepClient.KeepWriterThreadNew(self.queue, data, data_hash))
599         
600         def add_task(self, ks, service_root):
601             self.queue.put((ks, service_root))
602         
603         def successful_copies(self):
604             return self.queue.successful_copies
605         
606         def start(self):
607             for worker in self.workers:
608                 worker.start()
609     
610     
611     class KeepWriterThreadNew(threading.Thread):
612         def __init__(self, queue, data, data_hash):
613             super(KeepClient.KeepWriterThreadNew, self).__init__()
614             self.queue = queue
615             self.data = data
616             self.data_hash = data_hash
617             self.daemon = True
618         
619         def run(self):
620             while not self.queue.empty():
621                 if self.queue.pending_copies() > 0:
622                     # Avoid overreplication, wait for some needed retry
623                     with self.queue.retries_notification:
624                         if not self.queue.retries > 0:
625                             self.queue.retries_notification.wait()
626                             continue # try again when awake
627                         self.queue.retries -= 1
628
629                     # Get to work
630                     service, service_root = self.queue.get()
631                     
632                     success = bool(self.service.put(self.data_hash,
633                                                     self.data,
634                                                     timeout=None))
635                     result = service.last_result()
636                     if success:
637                         _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
638                                       str(threading.current_thread()),
639                                       self.data_hash,
640                                       len(self.data),
641                                       service_root)
642                         try:
643                             replicas_stored = int(result['headers']['x-keep-replicas-stored'])
644                         except (KeyError, ValueError):
645                             replicas_stored = 1
646                         self.queue.write_success(replicas_stored)
647                     else:
648                         if result.get('status_code', None):
649                             _logger.debug("Request fail: PUT %s => %s %s",
650                                           self.data_hash,
651                                           result['status_code'],
652                                           result['body'])
653                         self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
654                 else:
655                     # Remove the task from the queue anyways
656                     self.queue.get()
657                 # Mark as done so the queue can be join()ed
658                 self.queue.task_done()
659
660
661     class KeepWriterThread(threading.Thread):
662         """
663         Write a blob of data to the given Keep server. On success, call
664         save_response() of the given ThreadLimiter to save the returned
665         locator.
666         """
667         def __init__(self, keep_service, **kwargs):
668             super(KeepClient.KeepWriterThread, self).__init__()
669             self.service = keep_service
670             self.args = kwargs
671             self._success = False
672
673         def success(self):
674             return self._success
675
676         def run(self):
677             limiter = self.args['thread_limiter']
678             sequence = self.args['thread_sequence']
679             if sequence is not None:
680                 limiter.set_sequence(sequence)
681             with limiter:
682                 if not limiter.shall_i_proceed():
683                     # My turn arrived, but the job has been done without
684                     # me.
685                     return
686                 self.run_with_limiter(limiter)
687
688         def run_with_limiter(self, limiter):
689             if self.service.finished():
690                 return
691             _logger.debug("KeepWriterThread %s proceeding %s+%i %s",
692                           str(threading.current_thread()),
693                           self.args['data_hash'],
694                           len(self.args['data']),
695                           self.args['service_root'])
696             self._success = bool(self.service.put(
697                 self.args['data_hash'],
698                 self.args['data'],
699                 timeout=self.args.get('timeout', None)))
700             result = self.service.last_result()
701             if self._success:
702                 _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
703                               str(threading.current_thread()),
704                               self.args['data_hash'],
705                               len(self.args['data']),
706                               self.args['service_root'])
707                 # Tick the 'done' counter for the number of replica
708                 # reported stored by the server, for the case that
709                 # we're talking to a proxy or other backend that
710                 # stores to multiple copies for us.
711                 try:
712                     replicas_stored = int(result['headers']['x-keep-replicas-stored'])
713                 except (KeyError, ValueError):
714                     replicas_stored = 1
715                 limiter.save_response(result['body'].strip(), replicas_stored)
716             elif result.get('status_code', None):
717                 _logger.debug("Request fail: PUT %s => %s %s",
718                               self.args['data_hash'],
719                               result['status_code'],
720                               result['body'])
721             if not self._success:
722                 # Notify the failure so that the Thread limiter allows
723                 # a new one to run.
724                 limiter.save_response(None, 0)
725
726
727     def __init__(self, api_client=None, proxy=None,
728                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
729                  api_token=None, local_store=None, block_cache=None,
730                  num_retries=0, session=None):
731         """Initialize a new KeepClient.
732
733         Arguments:
734         :api_client:
735           The API client to use to find Keep services.  If not
736           provided, KeepClient will build one from available Arvados
737           configuration.
738
739         :proxy:
740           If specified, this KeepClient will send requests to this Keep
741           proxy.  Otherwise, KeepClient will fall back to the setting of the
742           ARVADOS_KEEP_PROXY configuration setting.  If you want to ensure
743           KeepClient does not use a proxy, pass in an empty string.
744
745         :timeout:
746           The initial timeout (in seconds) for HTTP requests to Keep
747           non-proxy servers.  A tuple of three floats is interpreted as
748           (connection_timeout, read_timeout, minimum_bandwidth). A connection
749           will be aborted if the average traffic rate falls below
750           minimum_bandwidth bytes per second over an interval of read_timeout
751           seconds. Because timeouts are often a result of transient server
752           load, the actual connection timeout will be increased by a factor
753           of two on each retry.
754           Default: (2, 256, 32768).
755
756         :proxy_timeout:
757           The initial timeout (in seconds) for HTTP requests to
758           Keep proxies. A tuple of three floats is interpreted as
759           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
760           described above for adjusting connection timeouts on retry also
761           applies.
762           Default: (20, 256, 32768).
763
764         :api_token:
765           If you're not using an API client, but only talking
766           directly to a Keep proxy, this parameter specifies an API token
767           to authenticate Keep requests.  It is an error to specify both
768           api_client and api_token.  If you specify neither, KeepClient
769           will use one available from the Arvados configuration.
770
771         :local_store:
772           If specified, this KeepClient will bypass Keep
773           services, and save data to the named directory.  If unspecified,
774           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
775           environment variable.  If you want to ensure KeepClient does not
776           use local storage, pass in an empty string.  This is primarily
777           intended to mock a server for testing.
778
779         :num_retries:
780           The default number of times to retry failed requests.
781           This will be used as the default num_retries value when get() and
782           put() are called.  Default 0.
783         """
784         self.lock = threading.Lock()
785         if proxy is None:
786             proxy = config.get('ARVADOS_KEEP_PROXY')
787         if api_token is None:
788             if api_client is None:
789                 api_token = config.get('ARVADOS_API_TOKEN')
790             else:
791                 api_token = api_client.api_token
792         elif api_client is not None:
793             raise ValueError(
794                 "can't build KeepClient with both API client and token")
795         if local_store is None:
796             local_store = os.environ.get('KEEP_LOCAL_STORE')
797
798         self.block_cache = block_cache if block_cache else KeepBlockCache()
799         self.timeout = timeout
800         self.proxy_timeout = proxy_timeout
801         self._user_agent_pool = Queue.LifoQueue()
802         self.upload_counter = Counter()
803         self.download_counter = Counter()
804         self.put_counter = Counter()
805         self.get_counter = Counter()
806         self.hits_counter = Counter()
807         self.misses_counter = Counter()
808
809         if local_store:
810             self.local_store = local_store
811             self.get = self.local_store_get
812             self.put = self.local_store_put
813         else:
814             self.num_retries = num_retries
815             self.max_replicas_per_service = None
816             if proxy:
817                 if not proxy.endswith('/'):
818                     proxy += '/'
819                 self.api_token = api_token
820                 self._gateway_services = {}
821                 self._keep_services = [{
822                     'uuid': 'proxy',
823                     'service_type': 'proxy',
824                     '_service_root': proxy,
825                     }]
826                 self._writable_services = self._keep_services
827                 self.using_proxy = True
828                 self._static_services_list = True
829             else:
830                 # It's important to avoid instantiating an API client
831                 # unless we actually need one, for testing's sake.
832                 if api_client is None:
833                     api_client = arvados.api('v1')
834                 self.api_client = api_client
835                 self.api_token = api_client.api_token
836                 self._gateway_services = {}
837                 self._keep_services = None
838                 self._writable_services = None
839                 self.using_proxy = None
840                 self._static_services_list = False
841
842     def current_timeout(self, attempt_number):
843         """Return the appropriate timeout to use for this client.
844
845         The proxy timeout setting if the backend service is currently a proxy,
846         the regular timeout setting otherwise.  The `attempt_number` indicates
847         how many times the operation has been tried already (starting from 0
848         for the first try), and scales the connection timeout portion of the
849         return value accordingly.
850
851         """
852         # TODO(twp): the timeout should be a property of a
853         # KeepService, not a KeepClient. See #4488.
854         t = self.proxy_timeout if self.using_proxy else self.timeout
855         if len(t) == 2:
856             return (t[0] * (1 << attempt_number), t[1])
857         else:
858             return (t[0] * (1 << attempt_number), t[1], t[2])
859     def _any_nondisk_services(self, service_list):
860         return any(ks.get('service_type', 'disk') != 'disk'
861                    for ks in service_list)
862
863     def build_services_list(self, force_rebuild=False):
864         if (self._static_services_list or
865               (self._keep_services and not force_rebuild)):
866             return
867         with self.lock:
868             try:
869                 keep_services = self.api_client.keep_services().accessible()
870             except Exception:  # API server predates Keep services.
871                 keep_services = self.api_client.keep_disks().list()
872
873             # Gateway services are only used when specified by UUID,
874             # so there's nothing to gain by filtering them by
875             # service_type.
876             self._gateway_services = {ks['uuid']: ks for ks in
877                                       keep_services.execute()['items']}
878             if not self._gateway_services:
879                 raise arvados.errors.NoKeepServersError()
880
881             # Precompute the base URI for each service.
882             for r in self._gateway_services.itervalues():
883                 host = r['service_host']
884                 if not host.startswith('[') and host.find(':') >= 0:
885                     # IPv6 URIs must be formatted like http://[::1]:80/...
886                     host = '[' + host + ']'
887                 r['_service_root'] = "{}://{}:{:d}/".format(
888                     'https' if r['service_ssl_flag'] else 'http',
889                     host,
890                     r['service_port'])
891
892             _logger.debug(str(self._gateway_services))
893             self._keep_services = [
894                 ks for ks in self._gateway_services.itervalues()
895                 if not ks.get('service_type', '').startswith('gateway:')]
896             self._writable_services = [ks for ks in self._keep_services
897                                        if not ks.get('read_only')]
898
899             # For disk type services, max_replicas_per_service is 1
900             # It is unknown (unlimited) for other service types.
901             if self._any_nondisk_services(self._writable_services):
902                 self.max_replicas_per_service = None
903             else:
904                 self.max_replicas_per_service = 1
905
906     def _service_weight(self, data_hash, service_uuid):
907         """Compute the weight of a Keep service endpoint for a data
908         block with a known hash.
909
910         The weight is md5(h + u) where u is the last 15 characters of
911         the service endpoint's UUID.
912         """
913         return hashlib.md5(data_hash + service_uuid[-15:]).hexdigest()
914
915     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
916         """Return an array of Keep service endpoints, in the order in
917         which they should be probed when reading or writing data with
918         the given hash+hints.
919         """
920         self.build_services_list(force_rebuild)
921
922         sorted_roots = []
923         # Use the services indicated by the given +K@... remote
924         # service hints, if any are present and can be resolved to a
925         # URI.
926         for hint in locator.hints:
927             if hint.startswith('K@'):
928                 if len(hint) == 7:
929                     sorted_roots.append(
930                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
931                 elif len(hint) == 29:
932                     svc = self._gateway_services.get(hint[2:])
933                     if svc:
934                         sorted_roots.append(svc['_service_root'])
935
936         # Sort the available local services by weight (heaviest first)
937         # for this locator, and return their service_roots (base URIs)
938         # in that order.
939         use_services = self._keep_services
940         if need_writable:
941             use_services = self._writable_services
942         self.using_proxy = self._any_nondisk_services(use_services)
943         sorted_roots.extend([
944             svc['_service_root'] for svc in sorted(
945                 use_services,
946                 reverse=True,
947                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
948         _logger.debug("{}: {}".format(locator, sorted_roots))
949         return sorted_roots
950
951     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, **headers):
952         # roots_map is a dictionary, mapping Keep service root strings
953         # to KeepService objects.  Poll for Keep services, and add any
954         # new ones to roots_map.  Return the current list of local
955         # root strings.
956         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
957         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
958         for root in local_roots:
959             if root not in roots_map:
960                 roots_map[root] = self.KeepService(
961                     root, self._user_agent_pool,
962                     upload_counter=self.upload_counter,
963                     download_counter=self.download_counter,
964                     **headers)
965         return local_roots
966
967     @staticmethod
968     def _check_loop_result(result):
969         # KeepClient RetryLoops should save results as a 2-tuple: the
970         # actual result of the request, and the number of servers available
971         # to receive the request this round.
972         # This method returns True if there's a real result, False if
973         # there are no more servers available, otherwise None.
974         if isinstance(result, Exception):
975             return None
976         result, tried_server_count = result
977         if (result is not None) and (result is not False):
978             return True
979         elif tried_server_count < 1:
980             _logger.info("No more Keep services to try; giving up")
981             return False
982         else:
983             return None
984
985     def get_from_cache(self, loc):
986         """Fetch a block only if is in the cache, otherwise return None."""
987         slot = self.block_cache.get(loc)
988         if slot is not None and slot.ready.is_set():
989             return slot.get()
990         else:
991             return None
992
993     @retry.retry_method
994     def head(self, loc_s, num_retries=None):
995         return self._get_or_head(loc_s, method="HEAD", num_retries=num_retries)
996
997     @retry.retry_method
998     def get(self, loc_s, num_retries=None):
999         return self._get_or_head(loc_s, method="GET", num_retries=num_retries)
1000
1001     def _get_or_head(self, loc_s, method="GET", num_retries=None):
1002         """Get data from Keep.
1003
1004         This method fetches one or more blocks of data from Keep.  It
1005         sends a request each Keep service registered with the API
1006         server (or the proxy provided when this client was
1007         instantiated), then each service named in location hints, in
1008         sequence.  As soon as one service provides the data, it's
1009         returned.
1010
1011         Arguments:
1012         * loc_s: A string of one or more comma-separated locators to fetch.
1013           This method returns the concatenation of these blocks.
1014         * num_retries: The number of times to retry GET requests to
1015           *each* Keep server if it returns temporary failures, with
1016           exponential backoff.  Note that, in each loop, the method may try
1017           to fetch data from every available Keep service, along with any
1018           that are named in location hints in the locator.  The default value
1019           is set when the KeepClient is initialized.
1020         """
1021         if ',' in loc_s:
1022             return ''.join(self.get(x) for x in loc_s.split(','))
1023
1024         self.get_counter.add(1)
1025
1026         locator = KeepLocator(loc_s)
1027         if method == "GET":
1028             slot, first = self.block_cache.reserve_cache(locator.md5sum)
1029             if not first:
1030                 self.hits_counter.add(1)
1031                 v = slot.get()
1032                 return v
1033
1034         self.misses_counter.add(1)
1035
1036         # If the locator has hints specifying a prefix (indicating a
1037         # remote keepproxy) or the UUID of a local gateway service,
1038         # read data from the indicated service(s) instead of the usual
1039         # list of local disk services.
1040         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1041                       for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1042         hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1043                            for hint in locator.hints if (
1044                                    hint.startswith('K@') and
1045                                    len(hint) == 29 and
1046                                    self._gateway_services.get(hint[2:])
1047                                    )])
1048         # Map root URLs to their KeepService objects.
1049         roots_map = {
1050             root: self.KeepService(root, self._user_agent_pool,
1051                                    upload_counter=self.upload_counter,
1052                                    download_counter=self.download_counter)
1053             for root in hint_roots
1054         }
1055
1056         # See #3147 for a discussion of the loop implementation.  Highlights:
1057         # * Refresh the list of Keep services after each failure, in case
1058         #   it's being updated.
1059         # * Retry until we succeed, we're out of retries, or every available
1060         #   service has returned permanent failure.
1061         sorted_roots = []
1062         roots_map = {}
1063         blob = None
1064         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1065                                backoff_start=2)
1066         for tries_left in loop:
1067             try:
1068                 sorted_roots = self.map_new_services(
1069                     roots_map, locator,
1070                     force_rebuild=(tries_left < num_retries),
1071                     need_writable=False)
1072             except Exception as error:
1073                 loop.save_result(error)
1074                 continue
1075
1076             # Query KeepService objects that haven't returned
1077             # permanent failure, in our specified shuffle order.
1078             services_to_try = [roots_map[root]
1079                                for root in sorted_roots
1080                                if roots_map[root].usable()]
1081             for keep_service in services_to_try:
1082                 blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1083                 if blob is not None:
1084                     break
1085             loop.save_result((blob, len(services_to_try)))
1086
1087         # Always cache the result, then return it if we succeeded.
1088         if method == "GET":
1089             slot.set(blob)
1090             self.block_cache.cap_cache()
1091         if loop.success():
1092             if method == "HEAD":
1093                 return True
1094             else:
1095                 return blob
1096
1097         # Q: Including 403 is necessary for the Keep tests to continue
1098         # passing, but maybe they should expect KeepReadError instead?
1099         not_founds = sum(1 for key in sorted_roots
1100                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1101         service_errors = ((key, roots_map[key].last_result()['error'])
1102                           for key in sorted_roots)
1103         if not roots_map:
1104             raise arvados.errors.KeepReadError(
1105                 "failed to read {}: no Keep services available ({})".format(
1106                     loc_s, loop.last_result()))
1107         elif not_founds == len(sorted_roots):
1108             raise arvados.errors.NotFoundError(
1109                 "{} not found".format(loc_s), service_errors)
1110         else:
1111             raise arvados.errors.KeepReadError(
1112                 "failed to read {}".format(loc_s), service_errors, label="service")
1113
1114     @retry.retry_method
1115     def put(self, data, copies=2, num_retries=None):
1116         """Save data in Keep.
1117
1118         This method will get a list of Keep services from the API server, and
1119         send the data to each one simultaneously in a new thread.  Once the
1120         uploads are finished, if enough copies are saved, this method returns
1121         the most recent HTTP response body.  If requests fail to upload
1122         enough copies, this method raises KeepWriteError.
1123
1124         Arguments:
1125         * data: The string of data to upload.
1126         * copies: The number of copies that the user requires be saved.
1127           Default 2.
1128         * num_retries: The number of times to retry PUT requests to
1129           *each* Keep server if it returns temporary failures, with
1130           exponential backoff.  The default value is set when the
1131           KeepClient is initialized.
1132         """
1133
1134         if isinstance(data, unicode):
1135             data = data.encode("ascii")
1136         elif not isinstance(data, str):
1137             raise arvados.errors.ArgumentError("Argument 'data' to KeepClient.put is not type 'str'")
1138
1139         self.put_counter.add(1)
1140
1141         data_hash = hashlib.md5(data).hexdigest()
1142         loc_s = data_hash + '+' + str(len(data))
1143         if copies < 1:
1144             return loc_s
1145         locator = KeepLocator(loc_s)
1146
1147         headers = {}
1148         # Tell the proxy how many copies we want it to store
1149         headers['X-Keep-Desired-Replication'] = str(copies)
1150         roots_map = {}
1151         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1152                                backoff_start=2)
1153         done = 0
1154         for tries_left in loop:
1155             try:
1156                 sorted_roots = self.map_new_services(
1157                     roots_map, locator,
1158                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
1159             except Exception as error:
1160                 loop.save_result(error)
1161                 continue
1162
1163             thread_limiter = KeepClient.ThreadLimiter(
1164                 copies - done, self.max_replicas_per_service)
1165             threads = []
1166             for service_root, ks in [(root, roots_map[root])
1167                                      for root in sorted_roots]:
1168                 if ks.finished():
1169                     continue
1170                 t = KeepClient.KeepWriterThread(
1171                     ks,
1172                     data=data,
1173                     data_hash=data_hash,
1174                     service_root=service_root,
1175                     thread_limiter=thread_limiter,
1176                     timeout=self.current_timeout(num_retries-tries_left),
1177                     thread_sequence=len(threads))
1178                 t.start()
1179                 threads.append(t)
1180             for t in threads:
1181                 t.join()
1182             done += thread_limiter.done()
1183             loop.save_result((done >= copies, len(threads)))
1184
1185         if loop.success():
1186             return thread_limiter.response()
1187         if not roots_map:
1188             raise arvados.errors.KeepWriteError(
1189                 "failed to write {}: no Keep services available ({})".format(
1190                     data_hash, loop.last_result()))
1191         else:
1192             service_errors = ((key, roots_map[key].last_result()['error'])
1193                               for key in sorted_roots
1194                               if roots_map[key].last_result()['error'])
1195             raise arvados.errors.KeepWriteError(
1196                 "failed to write {} (wanted {} copies but wrote {})".format(
1197                     data_hash, copies, thread_limiter.done()), service_errors, label="service")
1198
1199     def local_store_put(self, data, copies=1, num_retries=None):
1200         """A stub for put().
1201
1202         This method is used in place of the real put() method when
1203         using local storage (see constructor's local_store argument).
1204
1205         copies and num_retries arguments are ignored: they are here
1206         only for the sake of offering the same call signature as
1207         put().
1208
1209         Data stored this way can be retrieved via local_store_get().
1210         """
1211         md5 = hashlib.md5(data).hexdigest()
1212         locator = '%s+%d' % (md5, len(data))
1213         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
1214             f.write(data)
1215         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1216                   os.path.join(self.local_store, md5))
1217         return locator
1218
1219     def local_store_get(self, loc_s, num_retries=None):
1220         """Companion to local_store_put()."""
1221         try:
1222             locator = KeepLocator(loc_s)
1223         except ValueError:
1224             raise arvados.errors.NotFoundError(
1225                 "Invalid data locator: '%s'" % loc_s)
1226         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1227             return ''
1228         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
1229             return f.read()
1230
1231     def is_cached(self, locator):
1232         return self.block_cache.reserve_cache(expect_hash)