Merge branch '19362-all-webdav-via-sitefs'
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import io
19 import logging
20 import math
21 import os
22 import pycurl
23 import queue
24 import re
25 import socket
26 import ssl
27 import sys
28 import threading
29 from . import timer
30 import urllib.parse
31
32 if sys.version_info >= (3, 0):
33     from io import BytesIO
34 else:
35     from cStringIO import StringIO as BytesIO
36
37 import arvados
38 import arvados.config as config
39 import arvados.errors
40 import arvados.retry as retry
41 import arvados.util
42
43 _logger = logging.getLogger('arvados.keep')
44 global_client_object = None
45
46
47 # Monkey patch TCP constants when not available (apple). Values sourced from:
48 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
49 if sys.platform == 'darwin':
50     if not hasattr(socket, 'TCP_KEEPALIVE'):
51         socket.TCP_KEEPALIVE = 0x010
52     if not hasattr(socket, 'TCP_KEEPINTVL'):
53         socket.TCP_KEEPINTVL = 0x101
54     if not hasattr(socket, 'TCP_KEEPCNT'):
55         socket.TCP_KEEPCNT = 0x102
56
57
58 class KeepLocator(object):
59     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
60     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
61
62     def __init__(self, locator_str):
63         self.hints = []
64         self._perm_sig = None
65         self._perm_expiry = None
66         pieces = iter(locator_str.split('+'))
67         self.md5sum = next(pieces)
68         try:
69             self.size = int(next(pieces))
70         except StopIteration:
71             self.size = None
72         for hint in pieces:
73             if self.HINT_RE.match(hint) is None:
74                 raise ValueError("invalid hint format: {}".format(hint))
75             elif hint.startswith('A'):
76                 self.parse_permission_hint(hint)
77             else:
78                 self.hints.append(hint)
79
80     def __str__(self):
81         return '+'.join(
82             native_str(s)
83             for s in [self.md5sum, self.size,
84                       self.permission_hint()] + self.hints
85             if s is not None)
86
87     def stripped(self):
88         if self.size is not None:
89             return "%s+%i" % (self.md5sum, self.size)
90         else:
91             return self.md5sum
92
93     def _make_hex_prop(name, length):
94         # Build and return a new property with the given name that
95         # must be a hex string of the given length.
96         data_name = '_{}'.format(name)
97         def getter(self):
98             return getattr(self, data_name)
99         def setter(self, hex_str):
100             if not arvados.util.is_hex(hex_str, length):
101                 raise ValueError("{} is not a {}-digit hex string: {!r}".
102                                  format(name, length, hex_str))
103             setattr(self, data_name, hex_str)
104         return property(getter, setter)
105
106     md5sum = _make_hex_prop('md5sum', 32)
107     perm_sig = _make_hex_prop('perm_sig', 40)
108
109     @property
110     def perm_expiry(self):
111         return self._perm_expiry
112
113     @perm_expiry.setter
114     def perm_expiry(self, value):
115         if not arvados.util.is_hex(value, 1, 8):
116             raise ValueError(
117                 "permission timestamp must be a hex Unix timestamp: {}".
118                 format(value))
119         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
120
121     def permission_hint(self):
122         data = [self.perm_sig, self.perm_expiry]
123         if None in data:
124             return None
125         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
126         return "A{}@{:08x}".format(*data)
127
128     def parse_permission_hint(self, s):
129         try:
130             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
131         except IndexError:
132             raise ValueError("bad permission hint {}".format(s))
133
134     def permission_expired(self, as_of_dt=None):
135         if self.perm_expiry is None:
136             return False
137         elif as_of_dt is None:
138             as_of_dt = datetime.datetime.now()
139         return self.perm_expiry <= as_of_dt
140
141
142 class Keep(object):
143     """Simple interface to a global KeepClient object.
144
145     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
146     own API client.  The global KeepClient will build an API client from the
147     current Arvados configuration, which may not match the one you built.
148     """
149     _last_key = None
150
151     @classmethod
152     def global_client_object(cls):
153         global global_client_object
154         # Previously, KeepClient would change its behavior at runtime based
155         # on these configuration settings.  We simulate that behavior here
156         # by checking the values and returning a new KeepClient if any of
157         # them have changed.
158         key = (config.get('ARVADOS_API_HOST'),
159                config.get('ARVADOS_API_TOKEN'),
160                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
161                config.get('ARVADOS_KEEP_PROXY'),
162                os.environ.get('KEEP_LOCAL_STORE'))
163         if (global_client_object is None) or (cls._last_key != key):
164             global_client_object = KeepClient()
165             cls._last_key = key
166         return global_client_object
167
168     @staticmethod
169     def get(locator, **kwargs):
170         return Keep.global_client_object().get(locator, **kwargs)
171
172     @staticmethod
173     def put(data, **kwargs):
174         return Keep.global_client_object().put(data, **kwargs)
175
176 class KeepBlockCache(object):
177     # Default RAM cache is 256MiB
178     def __init__(self, cache_max=(256 * 1024 * 1024)):
179         self.cache_max = cache_max
180         self._cache = []
181         self._cache_lock = threading.Lock()
182
183     class CacheSlot(object):
184         __slots__ = ("locator", "ready", "content")
185
186         def __init__(self, locator):
187             self.locator = locator
188             self.ready = threading.Event()
189             self.content = None
190
191         def get(self):
192             self.ready.wait()
193             return self.content
194
195         def set(self, value):
196             self.content = value
197             self.ready.set()
198
199         def size(self):
200             if self.content is None:
201                 return 0
202             else:
203                 return len(self.content)
204
205     def cap_cache(self):
206         '''Cap the cache size to self.cache_max'''
207         with self._cache_lock:
208             # Select all slots except those where ready.is_set() and content is
209             # None (that means there was an error reading the block).
210             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
211             sm = sum([slot.size() for slot in self._cache])
212             while len(self._cache) > 0 and sm > self.cache_max:
213                 for i in range(len(self._cache)-1, -1, -1):
214                     if self._cache[i].ready.is_set():
215                         del self._cache[i]
216                         break
217                 sm = sum([slot.size() for slot in self._cache])
218
219     def _get(self, locator):
220         # Test if the locator is already in the cache
221         for i in range(0, len(self._cache)):
222             if self._cache[i].locator == locator:
223                 n = self._cache[i]
224                 if i != 0:
225                     # move it to the front
226                     del self._cache[i]
227                     self._cache.insert(0, n)
228                 return n
229         return None
230
231     def get(self, locator):
232         with self._cache_lock:
233             return self._get(locator)
234
235     def reserve_cache(self, locator):
236         '''Reserve a cache slot for the specified locator,
237         or return the existing slot.'''
238         with self._cache_lock:
239             n = self._get(locator)
240             if n:
241                 return n, False
242             else:
243                 # Add a new cache slot for the locator
244                 n = KeepBlockCache.CacheSlot(locator)
245                 self._cache.insert(0, n)
246                 return n, True
247
248 class Counter(object):
249     def __init__(self, v=0):
250         self._lk = threading.Lock()
251         self._val = v
252
253     def add(self, v):
254         with self._lk:
255             self._val += v
256
257     def get(self):
258         with self._lk:
259             return self._val
260
261
262 class KeepClient(object):
263
264     # Default Keep server connection timeout:  2 seconds
265     # Default Keep server read timeout:       256 seconds
266     # Default Keep server bandwidth minimum:  32768 bytes per second
267     # Default Keep proxy connection timeout:  20 seconds
268     # Default Keep proxy read timeout:        256 seconds
269     # Default Keep proxy bandwidth minimum:   32768 bytes per second
270     DEFAULT_TIMEOUT = (2, 256, 32768)
271     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
272
273
274     class KeepService(object):
275         """Make requests to a single Keep service, and track results.
276
277         A KeepService is intended to last long enough to perform one
278         transaction (GET or PUT) against one Keep service. This can
279         involve calling either get() or put() multiple times in order
280         to retry after transient failures. However, calling both get()
281         and put() on a single instance -- or using the same instance
282         to access two different Keep services -- will not produce
283         sensible behavior.
284         """
285
286         HTTP_ERRORS = (
287             socket.error,
288             ssl.SSLError,
289             arvados.errors.HttpError,
290         )
291
292         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
293                      upload_counter=None,
294                      download_counter=None,
295                      headers={},
296                      insecure=False):
297             self.root = root
298             self._user_agent_pool = user_agent_pool
299             self._result = {'error': None}
300             self._usable = True
301             self._session = None
302             self._socket = None
303             self.get_headers = {'Accept': 'application/octet-stream'}
304             self.get_headers.update(headers)
305             self.put_headers = headers
306             self.upload_counter = upload_counter
307             self.download_counter = download_counter
308             self.insecure = insecure
309
310         def usable(self):
311             """Is it worth attempting a request?"""
312             return self._usable
313
314         def finished(self):
315             """Did the request succeed or encounter permanent failure?"""
316             return self._result['error'] == False or not self._usable
317
318         def last_result(self):
319             return self._result
320
321         def _get_user_agent(self):
322             try:
323                 return self._user_agent_pool.get(block=False)
324             except queue.Empty:
325                 return pycurl.Curl()
326
327         def _put_user_agent(self, ua):
328             try:
329                 ua.reset()
330                 self._user_agent_pool.put(ua, block=False)
331             except:
332                 ua.close()
333
334         def _socket_open(self, *args, **kwargs):
335             if len(args) + len(kwargs) == 2:
336                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
337             else:
338                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
339
340         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
341             return self._socket_open_pycurl_7_21_5(
342                 purpose=None,
343                 address=collections.namedtuple(
344                     'Address', ['family', 'socktype', 'protocol', 'addr'],
345                 )(family, socktype, protocol, address))
346
347         def _socket_open_pycurl_7_21_5(self, purpose, address):
348             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
349             s = socket.socket(address.family, address.socktype, address.protocol)
350             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
351             # Will throw invalid protocol error on mac. This test prevents that.
352             if hasattr(socket, 'TCP_KEEPIDLE'):
353                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
354             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
355             self._socket = s
356             return s
357
358         def get(self, locator, method="GET", timeout=None):
359             # locator is a KeepLocator object.
360             url = self.root + str(locator)
361             _logger.debug("Request: %s %s", method, url)
362             curl = self._get_user_agent()
363             ok = None
364             try:
365                 with timer.Timer() as t:
366                     self._headers = {}
367                     response_body = BytesIO()
368                     curl.setopt(pycurl.NOSIGNAL, 1)
369                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
370                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
371                     curl.setopt(pycurl.URL, url.encode('utf-8'))
372                     curl.setopt(pycurl.HTTPHEADER, [
373                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
374                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
375                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
376                     if self.insecure:
377                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
378                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
379                     else:
380                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
381                     if method == "HEAD":
382                         curl.setopt(pycurl.NOBODY, True)
383                     self._setcurltimeouts(curl, timeout, method=="HEAD")
384
385                     try:
386                         curl.perform()
387                     except Exception as e:
388                         raise arvados.errors.HttpError(0, str(e))
389                     finally:
390                         if self._socket:
391                             self._socket.close()
392                             self._socket = None
393                     self._result = {
394                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
395                         'body': response_body.getvalue(),
396                         'headers': self._headers,
397                         'error': False,
398                     }
399
400                 ok = retry.check_http_response_success(self._result['status_code'])
401                 if not ok:
402                     self._result['error'] = arvados.errors.HttpError(
403                         self._result['status_code'],
404                         self._headers.get('x-status-line', 'Error'))
405             except self.HTTP_ERRORS as e:
406                 self._result = {
407                     'error': e,
408                 }
409             self._usable = ok != False
410             if self._result.get('status_code', None):
411                 # The client worked well enough to get an HTTP status
412                 # code, so presumably any problems are just on the
413                 # server side and it's OK to reuse the client.
414                 self._put_user_agent(curl)
415             else:
416                 # Don't return this client to the pool, in case it's
417                 # broken.
418                 curl.close()
419             if not ok:
420                 _logger.debug("Request fail: GET %s => %s: %s",
421                               url, type(self._result['error']), str(self._result['error']))
422                 return None
423             if method == "HEAD":
424                 _logger.info("HEAD %s: %s bytes",
425                          self._result['status_code'],
426                          self._result.get('content-length'))
427                 if self._result['headers'].get('x-keep-locator'):
428                     # This is a response to a remote block copy request, return
429                     # the local copy block locator.
430                     return self._result['headers'].get('x-keep-locator')
431                 return True
432
433             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
434                          self._result['status_code'],
435                          len(self._result['body']),
436                          t.msecs,
437                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
438
439             if self.download_counter:
440                 self.download_counter.add(len(self._result['body']))
441             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
442             if resp_md5 != locator.md5sum:
443                 _logger.warning("Checksum fail: md5(%s) = %s",
444                                 url, resp_md5)
445                 self._result['error'] = arvados.errors.HttpError(
446                     0, 'Checksum fail')
447                 return None
448             return self._result['body']
449
450         def put(self, hash_s, body, timeout=None, headers={}):
451             put_headers = copy.copy(self.put_headers)
452             put_headers.update(headers)
453             url = self.root + hash_s
454             _logger.debug("Request: PUT %s", url)
455             curl = self._get_user_agent()
456             ok = None
457             try:
458                 with timer.Timer() as t:
459                     self._headers = {}
460                     body_reader = BytesIO(body)
461                     response_body = BytesIO()
462                     curl.setopt(pycurl.NOSIGNAL, 1)
463                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
464                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
465                     curl.setopt(pycurl.URL, url.encode('utf-8'))
466                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
467                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
468                     # response) instead of sending the request body immediately.
469                     # This allows the server to reject the request if the request
470                     # is invalid or the server is read-only, without waiting for
471                     # the client to send the entire block.
472                     curl.setopt(pycurl.UPLOAD, True)
473                     curl.setopt(pycurl.INFILESIZE, len(body))
474                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
475                     curl.setopt(pycurl.HTTPHEADER, [
476                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
477                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
478                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
479                     if self.insecure:
480                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
481                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
482                     else:
483                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
484                     self._setcurltimeouts(curl, timeout)
485                     try:
486                         curl.perform()
487                     except Exception as e:
488                         raise arvados.errors.HttpError(0, str(e))
489                     finally:
490                         if self._socket:
491                             self._socket.close()
492                             self._socket = None
493                     self._result = {
494                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
495                         'body': response_body.getvalue().decode('utf-8'),
496                         'headers': self._headers,
497                         'error': False,
498                     }
499                 ok = retry.check_http_response_success(self._result['status_code'])
500                 if not ok:
501                     self._result['error'] = arvados.errors.HttpError(
502                         self._result['status_code'],
503                         self._headers.get('x-status-line', 'Error'))
504             except self.HTTP_ERRORS as e:
505                 self._result = {
506                     'error': e,
507                 }
508             self._usable = ok != False # still usable if ok is True or None
509             if self._result.get('status_code', None):
510                 # Client is functional. See comment in get().
511                 self._put_user_agent(curl)
512             else:
513                 curl.close()
514             if not ok:
515                 _logger.debug("Request fail: PUT %s => %s: %s",
516                               url, type(self._result['error']), str(self._result['error']))
517                 return False
518             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
519                          self._result['status_code'],
520                          len(body),
521                          t.msecs,
522                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
523             if self.upload_counter:
524                 self.upload_counter.add(len(body))
525             return True
526
527         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
528             if not timeouts:
529                 return
530             elif isinstance(timeouts, tuple):
531                 if len(timeouts) == 2:
532                     conn_t, xfer_t = timeouts
533                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
534                 else:
535                     conn_t, xfer_t, bandwidth_bps = timeouts
536             else:
537                 conn_t, xfer_t = (timeouts, timeouts)
538                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
539             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
540             if not ignore_bandwidth:
541                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
542                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
543
544         def _headerfunction(self, header_line):
545             if isinstance(header_line, bytes):
546                 header_line = header_line.decode('iso-8859-1')
547             if ':' in header_line:
548                 name, value = header_line.split(':', 1)
549                 name = name.strip().lower()
550                 value = value.strip()
551             elif self._headers:
552                 name = self._lastheadername
553                 value = self._headers[name] + ' ' + header_line.strip()
554             elif header_line.startswith('HTTP/'):
555                 name = 'x-status-line'
556                 value = header_line
557             else:
558                 _logger.error("Unexpected header line: %s", header_line)
559                 return
560             self._lastheadername = name
561             self._headers[name] = value
562             # Returning None implies all bytes were written
563
564
565     class KeepWriterQueue(queue.Queue):
566         def __init__(self, copies, classes=[]):
567             queue.Queue.__init__(self) # Old-style superclass
568             self.wanted_copies = copies
569             self.wanted_storage_classes = classes
570             self.successful_copies = 0
571             self.confirmed_storage_classes = {}
572             self.response = None
573             self.storage_classes_tracking = True
574             self.queue_data_lock = threading.RLock()
575             self.pending_tries = max(copies, len(classes))
576             self.pending_tries_notification = threading.Condition()
577
578         def write_success(self, response, replicas_nr, classes_confirmed):
579             with self.queue_data_lock:
580                 self.successful_copies += replicas_nr
581                 if classes_confirmed is None:
582                     self.storage_classes_tracking = False
583                 elif self.storage_classes_tracking:
584                     for st_class, st_copies in classes_confirmed.items():
585                         try:
586                             self.confirmed_storage_classes[st_class] += st_copies
587                         except KeyError:
588                             self.confirmed_storage_classes[st_class] = st_copies
589                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
590                 self.response = response
591             with self.pending_tries_notification:
592                 self.pending_tries_notification.notify_all()
593
594         def write_fail(self, ks):
595             with self.pending_tries_notification:
596                 self.pending_tries += 1
597                 self.pending_tries_notification.notify()
598
599         def pending_copies(self):
600             with self.queue_data_lock:
601                 return self.wanted_copies - self.successful_copies
602
603         def satisfied_classes(self):
604             with self.queue_data_lock:
605                 if not self.storage_classes_tracking:
606                     # Notifies disabled storage classes expectation to
607                     # the outer loop.
608                     return None
609             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
610
611         def pending_classes(self):
612             with self.queue_data_lock:
613                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
614                     return []
615                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
616                 for st_class, st_copies in self.confirmed_storage_classes.items():
617                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
618                         unsatisfied_classes.remove(st_class)
619                 return unsatisfied_classes
620
621         def get_next_task(self):
622             with self.pending_tries_notification:
623                 while True:
624                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
625                         # This notify_all() is unnecessary --
626                         # write_success() already called notify_all()
627                         # when pending<1 became true, so it's not
628                         # possible for any other thread to be in
629                         # wait() now -- but it's cheap insurance
630                         # against deadlock so we do it anyway:
631                         self.pending_tries_notification.notify_all()
632                         # Drain the queue and then raise Queue.Empty
633                         while True:
634                             self.get_nowait()
635                             self.task_done()
636                     elif self.pending_tries > 0:
637                         service, service_root = self.get_nowait()
638                         if service.finished():
639                             self.task_done()
640                             continue
641                         self.pending_tries -= 1
642                         return service, service_root
643                     elif self.empty():
644                         self.pending_tries_notification.notify_all()
645                         raise queue.Empty
646                     else:
647                         self.pending_tries_notification.wait()
648
649
650     class KeepWriterThreadPool(object):
651         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
652             self.total_task_nr = 0
653             if (not max_service_replicas) or (max_service_replicas >= copies):
654                 num_threads = 1
655             else:
656                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
657             _logger.debug("Pool max threads is %d", num_threads)
658             self.workers = []
659             self.queue = KeepClient.KeepWriterQueue(copies, classes)
660             # Create workers
661             for _ in range(num_threads):
662                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
663                 self.workers.append(w)
664
665         def add_task(self, ks, service_root):
666             self.queue.put((ks, service_root))
667             self.total_task_nr += 1
668
669         def done(self):
670             return self.queue.successful_copies, self.queue.satisfied_classes()
671
672         def join(self):
673             # Start workers
674             for worker in self.workers:
675                 worker.start()
676             # Wait for finished work
677             self.queue.join()
678
679         def response(self):
680             return self.queue.response
681
682
683     class KeepWriterThread(threading.Thread):
684         class TaskFailed(RuntimeError): pass
685
686         def __init__(self, queue, data, data_hash, timeout=None):
687             super(KeepClient.KeepWriterThread, self).__init__()
688             self.timeout = timeout
689             self.queue = queue
690             self.data = data
691             self.data_hash = data_hash
692             self.daemon = True
693
694         def run(self):
695             while True:
696                 try:
697                     service, service_root = self.queue.get_next_task()
698                 except queue.Empty:
699                     return
700                 try:
701                     locator, copies, classes = self.do_task(service, service_root)
702                 except Exception as e:
703                     if not isinstance(e, self.TaskFailed):
704                         _logger.exception("Exception in KeepWriterThread")
705                     self.queue.write_fail(service)
706                 else:
707                     self.queue.write_success(locator, copies, classes)
708                 finally:
709                     self.queue.task_done()
710
711         def do_task(self, service, service_root):
712             classes = self.queue.pending_classes()
713             headers = {}
714             if len(classes) > 0:
715                 classes.sort()
716                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
717             success = bool(service.put(self.data_hash,
718                                         self.data,
719                                         timeout=self.timeout,
720                                         headers=headers))
721             result = service.last_result()
722
723             if not success:
724                 if result.get('status_code'):
725                     _logger.debug("Request fail: PUT %s => %s %s",
726                                   self.data_hash,
727                                   result.get('status_code'),
728                                   result.get('body'))
729                 raise self.TaskFailed()
730
731             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
732                           str(threading.current_thread()),
733                           self.data_hash,
734                           len(self.data),
735                           service_root)
736             try:
737                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
738             except (KeyError, ValueError):
739                 replicas_stored = 1
740
741             classes_confirmed = {}
742             try:
743                 scch = result['headers']['x-keep-storage-classes-confirmed']
744                 for confirmation in scch.replace(' ', '').split(','):
745                     if '=' in confirmation:
746                         stored_class, stored_copies = confirmation.split('=')[:2]
747                         classes_confirmed[stored_class] = int(stored_copies)
748             except (KeyError, ValueError):
749                 # Storage classes confirmed header missing or corrupt
750                 classes_confirmed = None
751
752             return result['body'].strip(), replicas_stored, classes_confirmed
753
754
755     def __init__(self, api_client=None, proxy=None,
756                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
757                  api_token=None, local_store=None, block_cache=None,
758                  num_retries=0, session=None):
759         """Initialize a new KeepClient.
760
761         Arguments:
762         :api_client:
763           The API client to use to find Keep services.  If not
764           provided, KeepClient will build one from available Arvados
765           configuration.
766
767         :proxy:
768           If specified, this KeepClient will send requests to this Keep
769           proxy.  Otherwise, KeepClient will fall back to the setting of the
770           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
771           If you want to KeepClient does not use a proxy, pass in an empty
772           string.
773
774         :timeout:
775           The initial timeout (in seconds) for HTTP requests to Keep
776           non-proxy servers.  A tuple of three floats is interpreted as
777           (connection_timeout, read_timeout, minimum_bandwidth). A connection
778           will be aborted if the average traffic rate falls below
779           minimum_bandwidth bytes per second over an interval of read_timeout
780           seconds. Because timeouts are often a result of transient server
781           load, the actual connection timeout will be increased by a factor
782           of two on each retry.
783           Default: (2, 256, 32768).
784
785         :proxy_timeout:
786           The initial timeout (in seconds) for HTTP requests to
787           Keep proxies. A tuple of three floats is interpreted as
788           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
789           described above for adjusting connection timeouts on retry also
790           applies.
791           Default: (20, 256, 32768).
792
793         :api_token:
794           If you're not using an API client, but only talking
795           directly to a Keep proxy, this parameter specifies an API token
796           to authenticate Keep requests.  It is an error to specify both
797           api_client and api_token.  If you specify neither, KeepClient
798           will use one available from the Arvados configuration.
799
800         :local_store:
801           If specified, this KeepClient will bypass Keep
802           services, and save data to the named directory.  If unspecified,
803           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
804           environment variable.  If you want to ensure KeepClient does not
805           use local storage, pass in an empty string.  This is primarily
806           intended to mock a server for testing.
807
808         :num_retries:
809           The default number of times to retry failed requests.
810           This will be used as the default num_retries value when get() and
811           put() are called.  Default 0.
812         """
813         self.lock = threading.Lock()
814         if proxy is None:
815             if config.get('ARVADOS_KEEP_SERVICES'):
816                 proxy = config.get('ARVADOS_KEEP_SERVICES')
817             else:
818                 proxy = config.get('ARVADOS_KEEP_PROXY')
819         if api_token is None:
820             if api_client is None:
821                 api_token = config.get('ARVADOS_API_TOKEN')
822             else:
823                 api_token = api_client.api_token
824         elif api_client is not None:
825             raise ValueError(
826                 "can't build KeepClient with both API client and token")
827         if local_store is None:
828             local_store = os.environ.get('KEEP_LOCAL_STORE')
829
830         if api_client is None:
831             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
832         else:
833             self.insecure = api_client.insecure
834
835         self.block_cache = block_cache if block_cache else KeepBlockCache()
836         self.timeout = timeout
837         self.proxy_timeout = proxy_timeout
838         self._user_agent_pool = queue.LifoQueue()
839         self.upload_counter = Counter()
840         self.download_counter = Counter()
841         self.put_counter = Counter()
842         self.get_counter = Counter()
843         self.hits_counter = Counter()
844         self.misses_counter = Counter()
845         self._storage_classes_unsupported_warning = False
846         self._default_classes = []
847
848         if local_store:
849             self.local_store = local_store
850             self.head = self.local_store_head
851             self.get = self.local_store_get
852             self.put = self.local_store_put
853         else:
854             self.num_retries = num_retries
855             self.max_replicas_per_service = None
856             if proxy:
857                 proxy_uris = proxy.split()
858                 for i in range(len(proxy_uris)):
859                     if not proxy_uris[i].endswith('/'):
860                         proxy_uris[i] += '/'
861                     # URL validation
862                     url = urllib.parse.urlparse(proxy_uris[i])
863                     if not (url.scheme and url.netloc):
864                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
865                 self.api_token = api_token
866                 self._gateway_services = {}
867                 self._keep_services = [{
868                     'uuid': "00000-bi6l4-%015d" % idx,
869                     'service_type': 'proxy',
870                     '_service_root': uri,
871                     } for idx, uri in enumerate(proxy_uris)]
872                 self._writable_services = self._keep_services
873                 self.using_proxy = True
874                 self._static_services_list = True
875             else:
876                 # It's important to avoid instantiating an API client
877                 # unless we actually need one, for testing's sake.
878                 if api_client is None:
879                     api_client = arvados.api('v1')
880                 self.api_client = api_client
881                 self.api_token = api_client.api_token
882                 self._gateway_services = {}
883                 self._keep_services = None
884                 self._writable_services = None
885                 self.using_proxy = None
886                 self._static_services_list = False
887                 try:
888                     self._default_classes = [
889                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
890                 except KeyError:
891                     # We're talking to an old cluster
892                     pass
893
894     def current_timeout(self, attempt_number):
895         """Return the appropriate timeout to use for this client.
896
897         The proxy timeout setting if the backend service is currently a proxy,
898         the regular timeout setting otherwise.  The `attempt_number` indicates
899         how many times the operation has been tried already (starting from 0
900         for the first try), and scales the connection timeout portion of the
901         return value accordingly.
902
903         """
904         # TODO(twp): the timeout should be a property of a
905         # KeepService, not a KeepClient. See #4488.
906         t = self.proxy_timeout if self.using_proxy else self.timeout
907         if len(t) == 2:
908             return (t[0] * (1 << attempt_number), t[1])
909         else:
910             return (t[0] * (1 << attempt_number), t[1], t[2])
911     def _any_nondisk_services(self, service_list):
912         return any(ks.get('service_type', 'disk') != 'disk'
913                    for ks in service_list)
914
915     def build_services_list(self, force_rebuild=False):
916         if (self._static_services_list or
917               (self._keep_services and not force_rebuild)):
918             return
919         with self.lock:
920             try:
921                 keep_services = self.api_client.keep_services().accessible()
922             except Exception:  # API server predates Keep services.
923                 keep_services = self.api_client.keep_disks().list()
924
925             # Gateway services are only used when specified by UUID,
926             # so there's nothing to gain by filtering them by
927             # service_type.
928             self._gateway_services = {ks['uuid']: ks for ks in
929                                       keep_services.execute()['items']}
930             if not self._gateway_services:
931                 raise arvados.errors.NoKeepServersError()
932
933             # Precompute the base URI for each service.
934             for r in self._gateway_services.values():
935                 host = r['service_host']
936                 if not host.startswith('[') and host.find(':') >= 0:
937                     # IPv6 URIs must be formatted like http://[::1]:80/...
938                     host = '[' + host + ']'
939                 r['_service_root'] = "{}://{}:{:d}/".format(
940                     'https' if r['service_ssl_flag'] else 'http',
941                     host,
942                     r['service_port'])
943
944             _logger.debug(str(self._gateway_services))
945             self._keep_services = [
946                 ks for ks in self._gateway_services.values()
947                 if not ks.get('service_type', '').startswith('gateway:')]
948             self._writable_services = [ks for ks in self._keep_services
949                                        if not ks.get('read_only')]
950
951             # For disk type services, max_replicas_per_service is 1
952             # It is unknown (unlimited) for other service types.
953             if self._any_nondisk_services(self._writable_services):
954                 self.max_replicas_per_service = None
955             else:
956                 self.max_replicas_per_service = 1
957
958     def _service_weight(self, data_hash, service_uuid):
959         """Compute the weight of a Keep service endpoint for a data
960         block with a known hash.
961
962         The weight is md5(h + u) where u is the last 15 characters of
963         the service endpoint's UUID.
964         """
965         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
966
967     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
968         """Return an array of Keep service endpoints, in the order in
969         which they should be probed when reading or writing data with
970         the given hash+hints.
971         """
972         self.build_services_list(force_rebuild)
973
974         sorted_roots = []
975         # Use the services indicated by the given +K@... remote
976         # service hints, if any are present and can be resolved to a
977         # URI.
978         for hint in locator.hints:
979             if hint.startswith('K@'):
980                 if len(hint) == 7:
981                     sorted_roots.append(
982                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
983                 elif len(hint) == 29:
984                     svc = self._gateway_services.get(hint[2:])
985                     if svc:
986                         sorted_roots.append(svc['_service_root'])
987
988         # Sort the available local services by weight (heaviest first)
989         # for this locator, and return their service_roots (base URIs)
990         # in that order.
991         use_services = self._keep_services
992         if need_writable:
993             use_services = self._writable_services
994         self.using_proxy = self._any_nondisk_services(use_services)
995         sorted_roots.extend([
996             svc['_service_root'] for svc in sorted(
997                 use_services,
998                 reverse=True,
999                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1000         _logger.debug("{}: {}".format(locator, sorted_roots))
1001         return sorted_roots
1002
1003     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1004         # roots_map is a dictionary, mapping Keep service root strings
1005         # to KeepService objects.  Poll for Keep services, and add any
1006         # new ones to roots_map.  Return the current list of local
1007         # root strings.
1008         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1009         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1010         for root in local_roots:
1011             if root not in roots_map:
1012                 roots_map[root] = self.KeepService(
1013                     root, self._user_agent_pool,
1014                     upload_counter=self.upload_counter,
1015                     download_counter=self.download_counter,
1016                     headers=headers,
1017                     insecure=self.insecure)
1018         return local_roots
1019
1020     @staticmethod
1021     def _check_loop_result(result):
1022         # KeepClient RetryLoops should save results as a 2-tuple: the
1023         # actual result of the request, and the number of servers available
1024         # to receive the request this round.
1025         # This method returns True if there's a real result, False if
1026         # there are no more servers available, otherwise None.
1027         if isinstance(result, Exception):
1028             return None
1029         result, tried_server_count = result
1030         if (result is not None) and (result is not False):
1031             return True
1032         elif tried_server_count < 1:
1033             _logger.info("No more Keep services to try; giving up")
1034             return False
1035         else:
1036             return None
1037
1038     def get_from_cache(self, loc_s):
1039         """Fetch a block only if is in the cache, otherwise return None."""
1040         locator = KeepLocator(loc_s)
1041         slot = self.block_cache.get(locator.md5sum)
1042         if slot is not None and slot.ready.is_set():
1043             return slot.get()
1044         else:
1045             return None
1046
1047     def refresh_signature(self, loc):
1048         """Ask Keep to get the remote block and return its local signature"""
1049         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1050         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1051
1052     @retry.retry_method
1053     def head(self, loc_s, **kwargs):
1054         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1055
1056     @retry.retry_method
1057     def get(self, loc_s, **kwargs):
1058         return self._get_or_head(loc_s, method="GET", **kwargs)
1059
1060     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
1061         """Get data from Keep.
1062
1063         This method fetches one or more blocks of data from Keep.  It
1064         sends a request each Keep service registered with the API
1065         server (or the proxy provided when this client was
1066         instantiated), then each service named in location hints, in
1067         sequence.  As soon as one service provides the data, it's
1068         returned.
1069
1070         Arguments:
1071         * loc_s: A string of one or more comma-separated locators to fetch.
1072           This method returns the concatenation of these blocks.
1073         * num_retries: The number of times to retry GET requests to
1074           *each* Keep server if it returns temporary failures, with
1075           exponential backoff.  Note that, in each loop, the method may try
1076           to fetch data from every available Keep service, along with any
1077           that are named in location hints in the locator.  The default value
1078           is set when the KeepClient is initialized.
1079         """
1080         if ',' in loc_s:
1081             return ''.join(self.get(x) for x in loc_s.split(','))
1082
1083         self.get_counter.add(1)
1084
1085         request_id = (request_id or
1086                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1087                       arvados.util.new_request_id())
1088         if headers is None:
1089             headers = {}
1090         headers['X-Request-Id'] = request_id
1091
1092         slot = None
1093         blob = None
1094         try:
1095             locator = KeepLocator(loc_s)
1096             if method == "GET":
1097                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1098                 if not first:
1099                     if prefetch:
1100                         # this is request for a prefetch, if it is
1101                         # already in flight, return immediately.
1102                         # clear 'slot' to prevent finally block from
1103                         # calling slot.set()
1104                         slot = None
1105                         return None
1106                     self.hits_counter.add(1)
1107                     blob = slot.get()
1108                     if blob is None:
1109                         raise arvados.errors.KeepReadError(
1110                             "failed to read {}".format(loc_s))
1111                     return blob
1112
1113             self.misses_counter.add(1)
1114
1115             # If the locator has hints specifying a prefix (indicating a
1116             # remote keepproxy) or the UUID of a local gateway service,
1117             # read data from the indicated service(s) instead of the usual
1118             # list of local disk services.
1119             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1120                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1121             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1122                                for hint in locator.hints if (
1123                                        hint.startswith('K@') and
1124                                        len(hint) == 29 and
1125                                        self._gateway_services.get(hint[2:])
1126                                        )])
1127             # Map root URLs to their KeepService objects.
1128             roots_map = {
1129                 root: self.KeepService(root, self._user_agent_pool,
1130                                        upload_counter=self.upload_counter,
1131                                        download_counter=self.download_counter,
1132                                        headers=headers,
1133                                        insecure=self.insecure)
1134                 for root in hint_roots
1135             }
1136
1137             # See #3147 for a discussion of the loop implementation.  Highlights:
1138             # * Refresh the list of Keep services after each failure, in case
1139             #   it's being updated.
1140             # * Retry until we succeed, we're out of retries, or every available
1141             #   service has returned permanent failure.
1142             sorted_roots = []
1143             roots_map = {}
1144             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1145                                    backoff_start=2)
1146             for tries_left in loop:
1147                 try:
1148                     sorted_roots = self.map_new_services(
1149                         roots_map, locator,
1150                         force_rebuild=(tries_left < num_retries),
1151                         need_writable=False,
1152                         headers=headers)
1153                 except Exception as error:
1154                     loop.save_result(error)
1155                     continue
1156
1157                 # Query KeepService objects that haven't returned
1158                 # permanent failure, in our specified shuffle order.
1159                 services_to_try = [roots_map[root]
1160                                    for root in sorted_roots
1161                                    if roots_map[root].usable()]
1162                 for keep_service in services_to_try:
1163                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1164                     if blob is not None:
1165                         break
1166                 loop.save_result((blob, len(services_to_try)))
1167
1168             # Always cache the result, then return it if we succeeded.
1169             if loop.success():
1170                 return blob
1171         finally:
1172             if slot is not None:
1173                 slot.set(blob)
1174                 self.block_cache.cap_cache()
1175
1176         # Q: Including 403 is necessary for the Keep tests to continue
1177         # passing, but maybe they should expect KeepReadError instead?
1178         not_founds = sum(1 for key in sorted_roots
1179                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1180         service_errors = ((key, roots_map[key].last_result()['error'])
1181                           for key in sorted_roots)
1182         if not roots_map:
1183             raise arvados.errors.KeepReadError(
1184                 "[{}] failed to read {}: no Keep services available ({})".format(
1185                     request_id, loc_s, loop.last_result()))
1186         elif not_founds == len(sorted_roots):
1187             raise arvados.errors.NotFoundError(
1188                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1189         else:
1190             raise arvados.errors.KeepReadError(
1191                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1192
1193     @retry.retry_method
1194     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1195         """Save data in Keep.
1196
1197         This method will get a list of Keep services from the API server, and
1198         send the data to each one simultaneously in a new thread.  Once the
1199         uploads are finished, if enough copies are saved, this method returns
1200         the most recent HTTP response body.  If requests fail to upload
1201         enough copies, this method raises KeepWriteError.
1202
1203         Arguments:
1204         * data: The string of data to upload.
1205         * copies: The number of copies that the user requires be saved.
1206           Default 2.
1207         * num_retries: The number of times to retry PUT requests to
1208           *each* Keep server if it returns temporary failures, with
1209           exponential backoff.  The default value is set when the
1210           KeepClient is initialized.
1211         * classes: An optional list of storage class names where copies should
1212           be written.
1213         """
1214
1215         classes = classes or self._default_classes
1216
1217         if not isinstance(data, bytes):
1218             data = data.encode()
1219
1220         self.put_counter.add(1)
1221
1222         data_hash = hashlib.md5(data).hexdigest()
1223         loc_s = data_hash + '+' + str(len(data))
1224         if copies < 1:
1225             return loc_s
1226         locator = KeepLocator(loc_s)
1227
1228         request_id = (request_id or
1229                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1230                       arvados.util.new_request_id())
1231         headers = {
1232             'X-Request-Id': request_id,
1233             'X-Keep-Desired-Replicas': str(copies),
1234         }
1235         roots_map = {}
1236         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1237                                backoff_start=2)
1238         done_copies = 0
1239         done_classes = []
1240         for tries_left in loop:
1241             try:
1242                 sorted_roots = self.map_new_services(
1243                     roots_map, locator,
1244                     force_rebuild=(tries_left < num_retries),
1245                     need_writable=True,
1246                     headers=headers)
1247             except Exception as error:
1248                 loop.save_result(error)
1249                 continue
1250
1251             pending_classes = []
1252             if done_classes is not None:
1253                 pending_classes = list(set(classes) - set(done_classes))
1254             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1255                                                         data_hash=data_hash,
1256                                                         copies=copies - done_copies,
1257                                                         max_service_replicas=self.max_replicas_per_service,
1258                                                         timeout=self.current_timeout(num_retries - tries_left),
1259                                                         classes=pending_classes)
1260             for service_root, ks in [(root, roots_map[root])
1261                                      for root in sorted_roots]:
1262                 if ks.finished():
1263                     continue
1264                 writer_pool.add_task(ks, service_root)
1265             writer_pool.join()
1266             pool_copies, pool_classes = writer_pool.done()
1267             done_copies += pool_copies
1268             if (done_classes is not None) and (pool_classes is not None):
1269                 done_classes += pool_classes
1270                 loop.save_result(
1271                     (done_copies >= copies and set(done_classes) == set(classes),
1272                     writer_pool.total_task_nr))
1273             else:
1274                 # Old keepstore contacted without storage classes support:
1275                 # success is determined only by successful copies.
1276                 #
1277                 # Disable storage classes tracking from this point forward.
1278                 if not self._storage_classes_unsupported_warning:
1279                     self._storage_classes_unsupported_warning = True
1280                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1281                 done_classes = None
1282                 loop.save_result(
1283                     (done_copies >= copies, writer_pool.total_task_nr))
1284
1285         if loop.success():
1286             return writer_pool.response()
1287         if not roots_map:
1288             raise arvados.errors.KeepWriteError(
1289                 "[{}] failed to write {}: no Keep services available ({})".format(
1290                     request_id, data_hash, loop.last_result()))
1291         else:
1292             service_errors = ((key, roots_map[key].last_result()['error'])
1293                               for key in sorted_roots
1294                               if roots_map[key].last_result()['error'])
1295             raise arvados.errors.KeepWriteError(
1296                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1297                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1298
1299     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1300         """A stub for put().
1301
1302         This method is used in place of the real put() method when
1303         using local storage (see constructor's local_store argument).
1304
1305         copies and num_retries arguments are ignored: they are here
1306         only for the sake of offering the same call signature as
1307         put().
1308
1309         Data stored this way can be retrieved via local_store_get().
1310         """
1311         md5 = hashlib.md5(data).hexdigest()
1312         locator = '%s+%d' % (md5, len(data))
1313         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1314             f.write(data)
1315         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1316                   os.path.join(self.local_store, md5))
1317         return locator
1318
1319     def local_store_get(self, loc_s, num_retries=None):
1320         """Companion to local_store_put()."""
1321         try:
1322             locator = KeepLocator(loc_s)
1323         except ValueError:
1324             raise arvados.errors.NotFoundError(
1325                 "Invalid data locator: '%s'" % loc_s)
1326         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1327             return b''
1328         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1329             return f.read()
1330
1331     def local_store_head(self, loc_s, num_retries=None):
1332         """Companion to local_store_put()."""
1333         try:
1334             locator = KeepLocator(loc_s)
1335         except ValueError:
1336             raise arvados.errors.NotFoundError(
1337                 "Invalid data locator: '%s'" % loc_s)
1338         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1339             return True
1340         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1341             return True