17417: Merge branch 'main' into 17417-add-arm64
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import io
19 import logging
20 import math
21 import os
22 import pycurl
23 import queue
24 import re
25 import socket
26 import ssl
27 import sys
28 import threading
29 from . import timer
30 import urllib.parse
31
32 if sys.version_info >= (3, 0):
33     from io import BytesIO
34 else:
35     from cStringIO import StringIO as BytesIO
36
37 import arvados
38 import arvados.config as config
39 import arvados.errors
40 import arvados.retry as retry
41 import arvados.util
42
43 _logger = logging.getLogger('arvados.keep')
44 global_client_object = None
45
46
47 # Monkey patch TCP constants when not available (apple). Values sourced from:
48 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
49 if sys.platform == 'darwin':
50     if not hasattr(socket, 'TCP_KEEPALIVE'):
51         socket.TCP_KEEPALIVE = 0x010
52     if not hasattr(socket, 'TCP_KEEPINTVL'):
53         socket.TCP_KEEPINTVL = 0x101
54     if not hasattr(socket, 'TCP_KEEPCNT'):
55         socket.TCP_KEEPCNT = 0x102
56
57
58 class KeepLocator(object):
59     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
60     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
61
62     def __init__(self, locator_str):
63         self.hints = []
64         self._perm_sig = None
65         self._perm_expiry = None
66         pieces = iter(locator_str.split('+'))
67         self.md5sum = next(pieces)
68         try:
69             self.size = int(next(pieces))
70         except StopIteration:
71             self.size = None
72         for hint in pieces:
73             if self.HINT_RE.match(hint) is None:
74                 raise ValueError("invalid hint format: {}".format(hint))
75             elif hint.startswith('A'):
76                 self.parse_permission_hint(hint)
77             else:
78                 self.hints.append(hint)
79
80     def __str__(self):
81         return '+'.join(
82             native_str(s)
83             for s in [self.md5sum, self.size,
84                       self.permission_hint()] + self.hints
85             if s is not None)
86
87     def stripped(self):
88         if self.size is not None:
89             return "%s+%i" % (self.md5sum, self.size)
90         else:
91             return self.md5sum
92
93     def _make_hex_prop(name, length):
94         # Build and return a new property with the given name that
95         # must be a hex string of the given length.
96         data_name = '_{}'.format(name)
97         def getter(self):
98             return getattr(self, data_name)
99         def setter(self, hex_str):
100             if not arvados.util.is_hex(hex_str, length):
101                 raise ValueError("{} is not a {}-digit hex string: {!r}".
102                                  format(name, length, hex_str))
103             setattr(self, data_name, hex_str)
104         return property(getter, setter)
105
106     md5sum = _make_hex_prop('md5sum', 32)
107     perm_sig = _make_hex_prop('perm_sig', 40)
108
109     @property
110     def perm_expiry(self):
111         return self._perm_expiry
112
113     @perm_expiry.setter
114     def perm_expiry(self, value):
115         if not arvados.util.is_hex(value, 1, 8):
116             raise ValueError(
117                 "permission timestamp must be a hex Unix timestamp: {}".
118                 format(value))
119         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
120
121     def permission_hint(self):
122         data = [self.perm_sig, self.perm_expiry]
123         if None in data:
124             return None
125         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
126         return "A{}@{:08x}".format(*data)
127
128     def parse_permission_hint(self, s):
129         try:
130             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
131         except IndexError:
132             raise ValueError("bad permission hint {}".format(s))
133
134     def permission_expired(self, as_of_dt=None):
135         if self.perm_expiry is None:
136             return False
137         elif as_of_dt is None:
138             as_of_dt = datetime.datetime.now()
139         return self.perm_expiry <= as_of_dt
140
141
142 class Keep(object):
143     """Simple interface to a global KeepClient object.
144
145     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
146     own API client.  The global KeepClient will build an API client from the
147     current Arvados configuration, which may not match the one you built.
148     """
149     _last_key = None
150
151     @classmethod
152     def global_client_object(cls):
153         global global_client_object
154         # Previously, KeepClient would change its behavior at runtime based
155         # on these configuration settings.  We simulate that behavior here
156         # by checking the values and returning a new KeepClient if any of
157         # them have changed.
158         key = (config.get('ARVADOS_API_HOST'),
159                config.get('ARVADOS_API_TOKEN'),
160                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
161                config.get('ARVADOS_KEEP_PROXY'),
162                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
163                os.environ.get('KEEP_LOCAL_STORE'))
164         if (global_client_object is None) or (cls._last_key != key):
165             global_client_object = KeepClient()
166             cls._last_key = key
167         return global_client_object
168
169     @staticmethod
170     def get(locator, **kwargs):
171         return Keep.global_client_object().get(locator, **kwargs)
172
173     @staticmethod
174     def put(data, **kwargs):
175         return Keep.global_client_object().put(data, **kwargs)
176
177 class KeepBlockCache(object):
178     # Default RAM cache is 256MiB
179     def __init__(self, cache_max=(256 * 1024 * 1024)):
180         self.cache_max = cache_max
181         self._cache = []
182         self._cache_lock = threading.Lock()
183
184     class CacheSlot(object):
185         __slots__ = ("locator", "ready", "content")
186
187         def __init__(self, locator):
188             self.locator = locator
189             self.ready = threading.Event()
190             self.content = None
191
192         def get(self):
193             self.ready.wait()
194             return self.content
195
196         def set(self, value):
197             self.content = value
198             self.ready.set()
199
200         def size(self):
201             if self.content is None:
202                 return 0
203             else:
204                 return len(self.content)
205
206     def cap_cache(self):
207         '''Cap the cache size to self.cache_max'''
208         with self._cache_lock:
209             # Select all slots except those where ready.is_set() and content is
210             # None (that means there was an error reading the block).
211             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
212             sm = sum([slot.size() for slot in self._cache])
213             while len(self._cache) > 0 and sm > self.cache_max:
214                 for i in range(len(self._cache)-1, -1, -1):
215                     if self._cache[i].ready.is_set():
216                         del self._cache[i]
217                         break
218                 sm = sum([slot.size() for slot in self._cache])
219
220     def _get(self, locator):
221         # Test if the locator is already in the cache
222         for i in range(0, len(self._cache)):
223             if self._cache[i].locator == locator:
224                 n = self._cache[i]
225                 if i != 0:
226                     # move it to the front
227                     del self._cache[i]
228                     self._cache.insert(0, n)
229                 return n
230         return None
231
232     def get(self, locator):
233         with self._cache_lock:
234             return self._get(locator)
235
236     def reserve_cache(self, locator):
237         '''Reserve a cache slot for the specified locator,
238         or return the existing slot.'''
239         with self._cache_lock:
240             n = self._get(locator)
241             if n:
242                 return n, False
243             else:
244                 # Add a new cache slot for the locator
245                 n = KeepBlockCache.CacheSlot(locator)
246                 self._cache.insert(0, n)
247                 return n, True
248
249 class Counter(object):
250     def __init__(self, v=0):
251         self._lk = threading.Lock()
252         self._val = v
253
254     def add(self, v):
255         with self._lk:
256             self._val += v
257
258     def get(self):
259         with self._lk:
260             return self._val
261
262
263 class KeepClient(object):
264
265     # Default Keep server connection timeout:  2 seconds
266     # Default Keep server read timeout:       256 seconds
267     # Default Keep server bandwidth minimum:  32768 bytes per second
268     # Default Keep proxy connection timeout:  20 seconds
269     # Default Keep proxy read timeout:        256 seconds
270     # Default Keep proxy bandwidth minimum:   32768 bytes per second
271     DEFAULT_TIMEOUT = (2, 256, 32768)
272     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
273
274
275     class KeepService(object):
276         """Make requests to a single Keep service, and track results.
277
278         A KeepService is intended to last long enough to perform one
279         transaction (GET or PUT) against one Keep service. This can
280         involve calling either get() or put() multiple times in order
281         to retry after transient failures. However, calling both get()
282         and put() on a single instance -- or using the same instance
283         to access two different Keep services -- will not produce
284         sensible behavior.
285         """
286
287         HTTP_ERRORS = (
288             socket.error,
289             ssl.SSLError,
290             arvados.errors.HttpError,
291         )
292
293         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
294                      upload_counter=None,
295                      download_counter=None,
296                      headers={},
297                      insecure=False):
298             self.root = root
299             self._user_agent_pool = user_agent_pool
300             self._result = {'error': None}
301             self._usable = True
302             self._session = None
303             self._socket = None
304             self.get_headers = {'Accept': 'application/octet-stream'}
305             self.get_headers.update(headers)
306             self.put_headers = headers
307             self.upload_counter = upload_counter
308             self.download_counter = download_counter
309             self.insecure = insecure
310
311         def usable(self):
312             """Is it worth attempting a request?"""
313             return self._usable
314
315         def finished(self):
316             """Did the request succeed or encounter permanent failure?"""
317             return self._result['error'] == False or not self._usable
318
319         def last_result(self):
320             return self._result
321
322         def _get_user_agent(self):
323             try:
324                 return self._user_agent_pool.get(block=False)
325             except queue.Empty:
326                 return pycurl.Curl()
327
328         def _put_user_agent(self, ua):
329             try:
330                 ua.reset()
331                 self._user_agent_pool.put(ua, block=False)
332             except:
333                 ua.close()
334
335         def _socket_open(self, *args, **kwargs):
336             if len(args) + len(kwargs) == 2:
337                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
338             else:
339                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
340
341         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
342             return self._socket_open_pycurl_7_21_5(
343                 purpose=None,
344                 address=collections.namedtuple(
345                     'Address', ['family', 'socktype', 'protocol', 'addr'],
346                 )(family, socktype, protocol, address))
347
348         def _socket_open_pycurl_7_21_5(self, purpose, address):
349             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
350             s = socket.socket(address.family, address.socktype, address.protocol)
351             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
352             # Will throw invalid protocol error on mac. This test prevents that.
353             if hasattr(socket, 'TCP_KEEPIDLE'):
354                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
355             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
356             self._socket = s
357             return s
358
359         def get(self, locator, method="GET", timeout=None):
360             # locator is a KeepLocator object.
361             url = self.root + str(locator)
362             _logger.debug("Request: %s %s", method, url)
363             curl = self._get_user_agent()
364             ok = None
365             try:
366                 with timer.Timer() as t:
367                     self._headers = {}
368                     response_body = BytesIO()
369                     curl.setopt(pycurl.NOSIGNAL, 1)
370                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
371                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
372                     curl.setopt(pycurl.URL, url.encode('utf-8'))
373                     curl.setopt(pycurl.HTTPHEADER, [
374                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
375                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
376                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
377                     if self.insecure:
378                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
379                     else:
380                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
381                     if method == "HEAD":
382                         curl.setopt(pycurl.NOBODY, True)
383                     self._setcurltimeouts(curl, timeout, method=="HEAD")
384
385                     try:
386                         curl.perform()
387                     except Exception as e:
388                         raise arvados.errors.HttpError(0, str(e))
389                     finally:
390                         if self._socket:
391                             self._socket.close()
392                             self._socket = None
393                     self._result = {
394                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
395                         'body': response_body.getvalue(),
396                         'headers': self._headers,
397                         'error': False,
398                     }
399
400                 ok = retry.check_http_response_success(self._result['status_code'])
401                 if not ok:
402                     self._result['error'] = arvados.errors.HttpError(
403                         self._result['status_code'],
404                         self._headers.get('x-status-line', 'Error'))
405             except self.HTTP_ERRORS as e:
406                 self._result = {
407                     'error': e,
408                 }
409             self._usable = ok != False
410             if self._result.get('status_code', None):
411                 # The client worked well enough to get an HTTP status
412                 # code, so presumably any problems are just on the
413                 # server side and it's OK to reuse the client.
414                 self._put_user_agent(curl)
415             else:
416                 # Don't return this client to the pool, in case it's
417                 # broken.
418                 curl.close()
419             if not ok:
420                 _logger.debug("Request fail: GET %s => %s: %s",
421                               url, type(self._result['error']), str(self._result['error']))
422                 return None
423             if method == "HEAD":
424                 _logger.info("HEAD %s: %s bytes",
425                          self._result['status_code'],
426                          self._result.get('content-length'))
427                 if self._result['headers'].get('x-keep-locator'):
428                     # This is a response to a remote block copy request, return
429                     # the local copy block locator.
430                     return self._result['headers'].get('x-keep-locator')
431                 return True
432
433             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
434                          self._result['status_code'],
435                          len(self._result['body']),
436                          t.msecs,
437                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
438
439             if self.download_counter:
440                 self.download_counter.add(len(self._result['body']))
441             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
442             if resp_md5 != locator.md5sum:
443                 _logger.warning("Checksum fail: md5(%s) = %s",
444                                 url, resp_md5)
445                 self._result['error'] = arvados.errors.HttpError(
446                     0, 'Checksum fail')
447                 return None
448             return self._result['body']
449
450         def put(self, hash_s, body, timeout=None, headers={}):
451             put_headers = copy.copy(self.put_headers)
452             put_headers.update(headers)
453             url = self.root + hash_s
454             _logger.debug("Request: PUT %s", url)
455             curl = self._get_user_agent()
456             ok = None
457             try:
458                 with timer.Timer() as t:
459                     self._headers = {}
460                     body_reader = BytesIO(body)
461                     response_body = BytesIO()
462                     curl.setopt(pycurl.NOSIGNAL, 1)
463                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
464                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
465                     curl.setopt(pycurl.URL, url.encode('utf-8'))
466                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
467                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
468                     # response) instead of sending the request body immediately.
469                     # This allows the server to reject the request if the request
470                     # is invalid or the server is read-only, without waiting for
471                     # the client to send the entire block.
472                     curl.setopt(pycurl.UPLOAD, True)
473                     curl.setopt(pycurl.INFILESIZE, len(body))
474                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
475                     curl.setopt(pycurl.HTTPHEADER, [
476                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
477                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
478                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
479                     if self.insecure:
480                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
481                     else:
482                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
483                     self._setcurltimeouts(curl, timeout)
484                     try:
485                         curl.perform()
486                     except Exception as e:
487                         raise arvados.errors.HttpError(0, str(e))
488                     finally:
489                         if self._socket:
490                             self._socket.close()
491                             self._socket = None
492                     self._result = {
493                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
494                         'body': response_body.getvalue().decode('utf-8'),
495                         'headers': self._headers,
496                         'error': False,
497                     }
498                 ok = retry.check_http_response_success(self._result['status_code'])
499                 if not ok:
500                     self._result['error'] = arvados.errors.HttpError(
501                         self._result['status_code'],
502                         self._headers.get('x-status-line', 'Error'))
503             except self.HTTP_ERRORS as e:
504                 self._result = {
505                     'error': e,
506                 }
507             self._usable = ok != False # still usable if ok is True or None
508             if self._result.get('status_code', None):
509                 # Client is functional. See comment in get().
510                 self._put_user_agent(curl)
511             else:
512                 curl.close()
513             if not ok:
514                 _logger.debug("Request fail: PUT %s => %s: %s",
515                               url, type(self._result['error']), str(self._result['error']))
516                 return False
517             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
518                          self._result['status_code'],
519                          len(body),
520                          t.msecs,
521                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
522             if self.upload_counter:
523                 self.upload_counter.add(len(body))
524             return True
525
526         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
527             if not timeouts:
528                 return
529             elif isinstance(timeouts, tuple):
530                 if len(timeouts) == 2:
531                     conn_t, xfer_t = timeouts
532                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
533                 else:
534                     conn_t, xfer_t, bandwidth_bps = timeouts
535             else:
536                 conn_t, xfer_t = (timeouts, timeouts)
537                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
538             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
539             if not ignore_bandwidth:
540                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
541                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
542
543         def _headerfunction(self, header_line):
544             if isinstance(header_line, bytes):
545                 header_line = header_line.decode('iso-8859-1')
546             if ':' in header_line:
547                 name, value = header_line.split(':', 1)
548                 name = name.strip().lower()
549                 value = value.strip()
550             elif self._headers:
551                 name = self._lastheadername
552                 value = self._headers[name] + ' ' + header_line.strip()
553             elif header_line.startswith('HTTP/'):
554                 name = 'x-status-line'
555                 value = header_line
556             else:
557                 _logger.error("Unexpected header line: %s", header_line)
558                 return
559             self._lastheadername = name
560             self._headers[name] = value
561             # Returning None implies all bytes were written
562
563
564     class KeepWriterQueue(queue.Queue):
565         def __init__(self, copies, classes=[]):
566             queue.Queue.__init__(self) # Old-style superclass
567             self.wanted_copies = copies
568             self.wanted_storage_classes = classes
569             self.successful_copies = 0
570             self.confirmed_storage_classes = {}
571             self.response = None
572             self.storage_classes_tracking = True
573             self.queue_data_lock = threading.RLock()
574             self.pending_tries = max(copies, len(classes))
575             self.pending_tries_notification = threading.Condition()
576
577         def write_success(self, response, replicas_nr, classes_confirmed):
578             with self.queue_data_lock:
579                 self.successful_copies += replicas_nr
580                 if classes_confirmed is None:
581                     self.storage_classes_tracking = False
582                 elif self.storage_classes_tracking:
583                     for st_class, st_copies in classes_confirmed.items():
584                         try:
585                             self.confirmed_storage_classes[st_class] += st_copies
586                         except KeyError:
587                             self.confirmed_storage_classes[st_class] = st_copies
588                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
589                 self.response = response
590             with self.pending_tries_notification:
591                 self.pending_tries_notification.notify_all()
592
593         def write_fail(self, ks):
594             with self.pending_tries_notification:
595                 self.pending_tries += 1
596                 self.pending_tries_notification.notify()
597
598         def pending_copies(self):
599             with self.queue_data_lock:
600                 return self.wanted_copies - self.successful_copies
601
602         def satisfied_classes(self):
603             with self.queue_data_lock:
604                 if not self.storage_classes_tracking:
605                     # Notifies disabled storage classes expectation to
606                     # the outer loop.
607                     return None
608             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
609
610         def pending_classes(self):
611             with self.queue_data_lock:
612                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
613                     return []
614                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
615                 for st_class, st_copies in self.confirmed_storage_classes.items():
616                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
617                         unsatisfied_classes.remove(st_class)
618                 return unsatisfied_classes
619
620         def get_next_task(self):
621             with self.pending_tries_notification:
622                 while True:
623                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
624                         # This notify_all() is unnecessary --
625                         # write_success() already called notify_all()
626                         # when pending<1 became true, so it's not
627                         # possible for any other thread to be in
628                         # wait() now -- but it's cheap insurance
629                         # against deadlock so we do it anyway:
630                         self.pending_tries_notification.notify_all()
631                         # Drain the queue and then raise Queue.Empty
632                         while True:
633                             self.get_nowait()
634                             self.task_done()
635                     elif self.pending_tries > 0:
636                         service, service_root = self.get_nowait()
637                         if service.finished():
638                             self.task_done()
639                             continue
640                         self.pending_tries -= 1
641                         return service, service_root
642                     elif self.empty():
643                         self.pending_tries_notification.notify_all()
644                         raise queue.Empty
645                     else:
646                         self.pending_tries_notification.wait()
647
648
649     class KeepWriterThreadPool(object):
650         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
651             self.total_task_nr = 0
652             if (not max_service_replicas) or (max_service_replicas >= copies):
653                 num_threads = 1
654             else:
655                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
656             _logger.debug("Pool max threads is %d", num_threads)
657             self.workers = []
658             self.queue = KeepClient.KeepWriterQueue(copies, classes)
659             # Create workers
660             for _ in range(num_threads):
661                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
662                 self.workers.append(w)
663
664         def add_task(self, ks, service_root):
665             self.queue.put((ks, service_root))
666             self.total_task_nr += 1
667
668         def done(self):
669             return self.queue.successful_copies, self.queue.satisfied_classes()
670
671         def join(self):
672             # Start workers
673             for worker in self.workers:
674                 worker.start()
675             # Wait for finished work
676             self.queue.join()
677
678         def response(self):
679             return self.queue.response
680
681
682     class KeepWriterThread(threading.Thread):
683         class TaskFailed(RuntimeError): pass
684
685         def __init__(self, queue, data, data_hash, timeout=None):
686             super(KeepClient.KeepWriterThread, self).__init__()
687             self.timeout = timeout
688             self.queue = queue
689             self.data = data
690             self.data_hash = data_hash
691             self.daemon = True
692
693         def run(self):
694             while True:
695                 try:
696                     service, service_root = self.queue.get_next_task()
697                 except queue.Empty:
698                     return
699                 try:
700                     locator, copies, classes = self.do_task(service, service_root)
701                 except Exception as e:
702                     if not isinstance(e, self.TaskFailed):
703                         _logger.exception("Exception in KeepWriterThread")
704                     self.queue.write_fail(service)
705                 else:
706                     self.queue.write_success(locator, copies, classes)
707                 finally:
708                     self.queue.task_done()
709
710         def do_task(self, service, service_root):
711             classes = self.queue.pending_classes()
712             headers = {}
713             if len(classes) > 0:
714                 classes.sort()
715                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
716             success = bool(service.put(self.data_hash,
717                                         self.data,
718                                         timeout=self.timeout,
719                                         headers=headers))
720             result = service.last_result()
721
722             if not success:
723                 if result.get('status_code'):
724                     _logger.debug("Request fail: PUT %s => %s %s",
725                                   self.data_hash,
726                                   result.get('status_code'),
727                                   result.get('body'))
728                 raise self.TaskFailed()
729
730             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
731                           str(threading.current_thread()),
732                           self.data_hash,
733                           len(self.data),
734                           service_root)
735             try:
736                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
737             except (KeyError, ValueError):
738                 replicas_stored = 1
739
740             classes_confirmed = {}
741             try:
742                 scch = result['headers']['x-keep-storage-classes-confirmed']
743                 for confirmation in scch.replace(' ', '').split(','):
744                     if '=' in confirmation:
745                         stored_class, stored_copies = confirmation.split('=')[:2]
746                         classes_confirmed[stored_class] = int(stored_copies)
747             except (KeyError, ValueError):
748                 # Storage classes confirmed header missing or corrupt
749                 classes_confirmed = None
750
751             return result['body'].strip(), replicas_stored, classes_confirmed
752
753
754     def __init__(self, api_client=None, proxy=None,
755                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
756                  api_token=None, local_store=None, block_cache=None,
757                  num_retries=0, session=None):
758         """Initialize a new KeepClient.
759
760         Arguments:
761         :api_client:
762           The API client to use to find Keep services.  If not
763           provided, KeepClient will build one from available Arvados
764           configuration.
765
766         :proxy:
767           If specified, this KeepClient will send requests to this Keep
768           proxy.  Otherwise, KeepClient will fall back to the setting of the
769           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
770           If you want to KeepClient does not use a proxy, pass in an empty
771           string.
772
773         :timeout:
774           The initial timeout (in seconds) for HTTP requests to Keep
775           non-proxy servers.  A tuple of three floats is interpreted as
776           (connection_timeout, read_timeout, minimum_bandwidth). A connection
777           will be aborted if the average traffic rate falls below
778           minimum_bandwidth bytes per second over an interval of read_timeout
779           seconds. Because timeouts are often a result of transient server
780           load, the actual connection timeout will be increased by a factor
781           of two on each retry.
782           Default: (2, 256, 32768).
783
784         :proxy_timeout:
785           The initial timeout (in seconds) for HTTP requests to
786           Keep proxies. A tuple of three floats is interpreted as
787           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
788           described above for adjusting connection timeouts on retry also
789           applies.
790           Default: (20, 256, 32768).
791
792         :api_token:
793           If you're not using an API client, but only talking
794           directly to a Keep proxy, this parameter specifies an API token
795           to authenticate Keep requests.  It is an error to specify both
796           api_client and api_token.  If you specify neither, KeepClient
797           will use one available from the Arvados configuration.
798
799         :local_store:
800           If specified, this KeepClient will bypass Keep
801           services, and save data to the named directory.  If unspecified,
802           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
803           environment variable.  If you want to ensure KeepClient does not
804           use local storage, pass in an empty string.  This is primarily
805           intended to mock a server for testing.
806
807         :num_retries:
808           The default number of times to retry failed requests.
809           This will be used as the default num_retries value when get() and
810           put() are called.  Default 0.
811         """
812         self.lock = threading.Lock()
813         if proxy is None:
814             if config.get('ARVADOS_KEEP_SERVICES'):
815                 proxy = config.get('ARVADOS_KEEP_SERVICES')
816             else:
817                 proxy = config.get('ARVADOS_KEEP_PROXY')
818         if api_token is None:
819             if api_client is None:
820                 api_token = config.get('ARVADOS_API_TOKEN')
821             else:
822                 api_token = api_client.api_token
823         elif api_client is not None:
824             raise ValueError(
825                 "can't build KeepClient with both API client and token")
826         if local_store is None:
827             local_store = os.environ.get('KEEP_LOCAL_STORE')
828
829         if api_client is None:
830             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
831         else:
832             self.insecure = api_client.insecure
833
834         self.block_cache = block_cache if block_cache else KeepBlockCache()
835         self.timeout = timeout
836         self.proxy_timeout = proxy_timeout
837         self._user_agent_pool = queue.LifoQueue()
838         self.upload_counter = Counter()
839         self.download_counter = Counter()
840         self.put_counter = Counter()
841         self.get_counter = Counter()
842         self.hits_counter = Counter()
843         self.misses_counter = Counter()
844         self._storage_classes_unsupported_warning = False
845         self._default_classes = []
846
847         if local_store:
848             self.local_store = local_store
849             self.head = self.local_store_head
850             self.get = self.local_store_get
851             self.put = self.local_store_put
852         else:
853             self.num_retries = num_retries
854             self.max_replicas_per_service = None
855             if proxy:
856                 proxy_uris = proxy.split()
857                 for i in range(len(proxy_uris)):
858                     if not proxy_uris[i].endswith('/'):
859                         proxy_uris[i] += '/'
860                     # URL validation
861                     url = urllib.parse.urlparse(proxy_uris[i])
862                     if not (url.scheme and url.netloc):
863                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
864                 self.api_token = api_token
865                 self._gateway_services = {}
866                 self._keep_services = [{
867                     'uuid': "00000-bi6l4-%015d" % idx,
868                     'service_type': 'proxy',
869                     '_service_root': uri,
870                     } for idx, uri in enumerate(proxy_uris)]
871                 self._writable_services = self._keep_services
872                 self.using_proxy = True
873                 self._static_services_list = True
874             else:
875                 # It's important to avoid instantiating an API client
876                 # unless we actually need one, for testing's sake.
877                 if api_client is None:
878                     api_client = arvados.api('v1')
879                 self.api_client = api_client
880                 self.api_token = api_client.api_token
881                 self._gateway_services = {}
882                 self._keep_services = None
883                 self._writable_services = None
884                 self.using_proxy = None
885                 self._static_services_list = False
886                 try:
887                     self._default_classes = [
888                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
889                 except KeyError:
890                     # We're talking to an old cluster
891                     pass
892
893     def current_timeout(self, attempt_number):
894         """Return the appropriate timeout to use for this client.
895
896         The proxy timeout setting if the backend service is currently a proxy,
897         the regular timeout setting otherwise.  The `attempt_number` indicates
898         how many times the operation has been tried already (starting from 0
899         for the first try), and scales the connection timeout portion of the
900         return value accordingly.
901
902         """
903         # TODO(twp): the timeout should be a property of a
904         # KeepService, not a KeepClient. See #4488.
905         t = self.proxy_timeout if self.using_proxy else self.timeout
906         if len(t) == 2:
907             return (t[0] * (1 << attempt_number), t[1])
908         else:
909             return (t[0] * (1 << attempt_number), t[1], t[2])
910     def _any_nondisk_services(self, service_list):
911         return any(ks.get('service_type', 'disk') != 'disk'
912                    for ks in service_list)
913
914     def build_services_list(self, force_rebuild=False):
915         if (self._static_services_list or
916               (self._keep_services and not force_rebuild)):
917             return
918         with self.lock:
919             try:
920                 keep_services = self.api_client.keep_services().accessible()
921             except Exception:  # API server predates Keep services.
922                 keep_services = self.api_client.keep_disks().list()
923
924             # Gateway services are only used when specified by UUID,
925             # so there's nothing to gain by filtering them by
926             # service_type.
927             self._gateway_services = {ks['uuid']: ks for ks in
928                                       keep_services.execute()['items']}
929             if not self._gateway_services:
930                 raise arvados.errors.NoKeepServersError()
931
932             # Precompute the base URI for each service.
933             for r in self._gateway_services.values():
934                 host = r['service_host']
935                 if not host.startswith('[') and host.find(':') >= 0:
936                     # IPv6 URIs must be formatted like http://[::1]:80/...
937                     host = '[' + host + ']'
938                 r['_service_root'] = "{}://{}:{:d}/".format(
939                     'https' if r['service_ssl_flag'] else 'http',
940                     host,
941                     r['service_port'])
942
943             _logger.debug(str(self._gateway_services))
944             self._keep_services = [
945                 ks for ks in self._gateway_services.values()
946                 if not ks.get('service_type', '').startswith('gateway:')]
947             self._writable_services = [ks for ks in self._keep_services
948                                        if not ks.get('read_only')]
949
950             # For disk type services, max_replicas_per_service is 1
951             # It is unknown (unlimited) for other service types.
952             if self._any_nondisk_services(self._writable_services):
953                 self.max_replicas_per_service = None
954             else:
955                 self.max_replicas_per_service = 1
956
957     def _service_weight(self, data_hash, service_uuid):
958         """Compute the weight of a Keep service endpoint for a data
959         block with a known hash.
960
961         The weight is md5(h + u) where u is the last 15 characters of
962         the service endpoint's UUID.
963         """
964         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
965
966     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
967         """Return an array of Keep service endpoints, in the order in
968         which they should be probed when reading or writing data with
969         the given hash+hints.
970         """
971         self.build_services_list(force_rebuild)
972
973         sorted_roots = []
974         # Use the services indicated by the given +K@... remote
975         # service hints, if any are present and can be resolved to a
976         # URI.
977         for hint in locator.hints:
978             if hint.startswith('K@'):
979                 if len(hint) == 7:
980                     sorted_roots.append(
981                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
982                 elif len(hint) == 29:
983                     svc = self._gateway_services.get(hint[2:])
984                     if svc:
985                         sorted_roots.append(svc['_service_root'])
986
987         # Sort the available local services by weight (heaviest first)
988         # for this locator, and return their service_roots (base URIs)
989         # in that order.
990         use_services = self._keep_services
991         if need_writable:
992             use_services = self._writable_services
993         self.using_proxy = self._any_nondisk_services(use_services)
994         sorted_roots.extend([
995             svc['_service_root'] for svc in sorted(
996                 use_services,
997                 reverse=True,
998                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
999         _logger.debug("{}: {}".format(locator, sorted_roots))
1000         return sorted_roots
1001
1002     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1003         # roots_map is a dictionary, mapping Keep service root strings
1004         # to KeepService objects.  Poll for Keep services, and add any
1005         # new ones to roots_map.  Return the current list of local
1006         # root strings.
1007         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1008         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1009         for root in local_roots:
1010             if root not in roots_map:
1011                 roots_map[root] = self.KeepService(
1012                     root, self._user_agent_pool,
1013                     upload_counter=self.upload_counter,
1014                     download_counter=self.download_counter,
1015                     headers=headers,
1016                     insecure=self.insecure)
1017         return local_roots
1018
1019     @staticmethod
1020     def _check_loop_result(result):
1021         # KeepClient RetryLoops should save results as a 2-tuple: the
1022         # actual result of the request, and the number of servers available
1023         # to receive the request this round.
1024         # This method returns True if there's a real result, False if
1025         # there are no more servers available, otherwise None.
1026         if isinstance(result, Exception):
1027             return None
1028         result, tried_server_count = result
1029         if (result is not None) and (result is not False):
1030             return True
1031         elif tried_server_count < 1:
1032             _logger.info("No more Keep services to try; giving up")
1033             return False
1034         else:
1035             return None
1036
1037     def get_from_cache(self, loc):
1038         """Fetch a block only if is in the cache, otherwise return None."""
1039         slot = self.block_cache.get(loc)
1040         if slot is not None and slot.ready.is_set():
1041             return slot.get()
1042         else:
1043             return None
1044
1045     def refresh_signature(self, loc):
1046         """Ask Keep to get the remote block and return its local signature"""
1047         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1048         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1049
1050     @retry.retry_method
1051     def head(self, loc_s, **kwargs):
1052         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1053
1054     @retry.retry_method
1055     def get(self, loc_s, **kwargs):
1056         return self._get_or_head(loc_s, method="GET", **kwargs)
1057
1058     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
1059         """Get data from Keep.
1060
1061         This method fetches one or more blocks of data from Keep.  It
1062         sends a request each Keep service registered with the API
1063         server (or the proxy provided when this client was
1064         instantiated), then each service named in location hints, in
1065         sequence.  As soon as one service provides the data, it's
1066         returned.
1067
1068         Arguments:
1069         * loc_s: A string of one or more comma-separated locators to fetch.
1070           This method returns the concatenation of these blocks.
1071         * num_retries: The number of times to retry GET requests to
1072           *each* Keep server if it returns temporary failures, with
1073           exponential backoff.  Note that, in each loop, the method may try
1074           to fetch data from every available Keep service, along with any
1075           that are named in location hints in the locator.  The default value
1076           is set when the KeepClient is initialized.
1077         """
1078         if ',' in loc_s:
1079             return ''.join(self.get(x) for x in loc_s.split(','))
1080
1081         self.get_counter.add(1)
1082
1083         request_id = (request_id or
1084                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1085                       arvados.util.new_request_id())
1086         if headers is None:
1087             headers = {}
1088         headers['X-Request-Id'] = request_id
1089
1090         slot = None
1091         blob = None
1092         try:
1093             locator = KeepLocator(loc_s)
1094             if method == "GET":
1095                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1096                 if not first:
1097                     self.hits_counter.add(1)
1098                     blob = slot.get()
1099                     if blob is None:
1100                         raise arvados.errors.KeepReadError(
1101                             "failed to read {}".format(loc_s))
1102                     return blob
1103
1104             self.misses_counter.add(1)
1105
1106             # If the locator has hints specifying a prefix (indicating a
1107             # remote keepproxy) or the UUID of a local gateway service,
1108             # read data from the indicated service(s) instead of the usual
1109             # list of local disk services.
1110             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1111                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1112             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1113                                for hint in locator.hints if (
1114                                        hint.startswith('K@') and
1115                                        len(hint) == 29 and
1116                                        self._gateway_services.get(hint[2:])
1117                                        )])
1118             # Map root URLs to their KeepService objects.
1119             roots_map = {
1120                 root: self.KeepService(root, self._user_agent_pool,
1121                                        upload_counter=self.upload_counter,
1122                                        download_counter=self.download_counter,
1123                                        headers=headers,
1124                                        insecure=self.insecure)
1125                 for root in hint_roots
1126             }
1127
1128             # See #3147 for a discussion of the loop implementation.  Highlights:
1129             # * Refresh the list of Keep services after each failure, in case
1130             #   it's being updated.
1131             # * Retry until we succeed, we're out of retries, or every available
1132             #   service has returned permanent failure.
1133             sorted_roots = []
1134             roots_map = {}
1135             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1136                                    backoff_start=2)
1137             for tries_left in loop:
1138                 try:
1139                     sorted_roots = self.map_new_services(
1140                         roots_map, locator,
1141                         force_rebuild=(tries_left < num_retries),
1142                         need_writable=False,
1143                         headers=headers)
1144                 except Exception as error:
1145                     loop.save_result(error)
1146                     continue
1147
1148                 # Query KeepService objects that haven't returned
1149                 # permanent failure, in our specified shuffle order.
1150                 services_to_try = [roots_map[root]
1151                                    for root in sorted_roots
1152                                    if roots_map[root].usable()]
1153                 for keep_service in services_to_try:
1154                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1155                     if blob is not None:
1156                         break
1157                 loop.save_result((blob, len(services_to_try)))
1158
1159             # Always cache the result, then return it if we succeeded.
1160             if loop.success():
1161                 return blob
1162         finally:
1163             if slot is not None:
1164                 slot.set(blob)
1165                 self.block_cache.cap_cache()
1166
1167         # Q: Including 403 is necessary for the Keep tests to continue
1168         # passing, but maybe they should expect KeepReadError instead?
1169         not_founds = sum(1 for key in sorted_roots
1170                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1171         service_errors = ((key, roots_map[key].last_result()['error'])
1172                           for key in sorted_roots)
1173         if not roots_map:
1174             raise arvados.errors.KeepReadError(
1175                 "[{}] failed to read {}: no Keep services available ({})".format(
1176                     request_id, loc_s, loop.last_result()))
1177         elif not_founds == len(sorted_roots):
1178             raise arvados.errors.NotFoundError(
1179                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1180         else:
1181             raise arvados.errors.KeepReadError(
1182                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1183
1184     @retry.retry_method
1185     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1186         """Save data in Keep.
1187
1188         This method will get a list of Keep services from the API server, and
1189         send the data to each one simultaneously in a new thread.  Once the
1190         uploads are finished, if enough copies are saved, this method returns
1191         the most recent HTTP response body.  If requests fail to upload
1192         enough copies, this method raises KeepWriteError.
1193
1194         Arguments:
1195         * data: The string of data to upload.
1196         * copies: The number of copies that the user requires be saved.
1197           Default 2.
1198         * num_retries: The number of times to retry PUT requests to
1199           *each* Keep server if it returns temporary failures, with
1200           exponential backoff.  The default value is set when the
1201           KeepClient is initialized.
1202         * classes: An optional list of storage class names where copies should
1203           be written.
1204         """
1205
1206         classes = classes or self._default_classes
1207
1208         if not isinstance(data, bytes):
1209             data = data.encode()
1210
1211         self.put_counter.add(1)
1212
1213         data_hash = hashlib.md5(data).hexdigest()
1214         loc_s = data_hash + '+' + str(len(data))
1215         if copies < 1:
1216             return loc_s
1217         locator = KeepLocator(loc_s)
1218
1219         request_id = (request_id or
1220                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1221                       arvados.util.new_request_id())
1222         headers = {
1223             'X-Request-Id': request_id,
1224             'X-Keep-Desired-Replicas': str(copies),
1225         }
1226         roots_map = {}
1227         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1228                                backoff_start=2)
1229         done_copies = 0
1230         done_classes = []
1231         for tries_left in loop:
1232             try:
1233                 sorted_roots = self.map_new_services(
1234                     roots_map, locator,
1235                     force_rebuild=(tries_left < num_retries),
1236                     need_writable=True,
1237                     headers=headers)
1238             except Exception as error:
1239                 loop.save_result(error)
1240                 continue
1241
1242             pending_classes = []
1243             if done_classes is not None:
1244                 pending_classes = list(set(classes) - set(done_classes))
1245             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1246                                                         data_hash=data_hash,
1247                                                         copies=copies - done_copies,
1248                                                         max_service_replicas=self.max_replicas_per_service,
1249                                                         timeout=self.current_timeout(num_retries - tries_left),
1250                                                         classes=pending_classes)
1251             for service_root, ks in [(root, roots_map[root])
1252                                      for root in sorted_roots]:
1253                 if ks.finished():
1254                     continue
1255                 writer_pool.add_task(ks, service_root)
1256             writer_pool.join()
1257             pool_copies, pool_classes = writer_pool.done()
1258             done_copies += pool_copies
1259             if (done_classes is not None) and (pool_classes is not None):
1260                 done_classes += pool_classes
1261                 loop.save_result(
1262                     (done_copies >= copies and set(done_classes) == set(classes),
1263                     writer_pool.total_task_nr))
1264             else:
1265                 # Old keepstore contacted without storage classes support:
1266                 # success is determined only by successful copies.
1267                 #
1268                 # Disable storage classes tracking from this point forward.
1269                 if not self._storage_classes_unsupported_warning:
1270                     self._storage_classes_unsupported_warning = True
1271                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1272                 done_classes = None
1273                 loop.save_result(
1274                     (done_copies >= copies, writer_pool.total_task_nr))
1275
1276         if loop.success():
1277             return writer_pool.response()
1278         if not roots_map:
1279             raise arvados.errors.KeepWriteError(
1280                 "[{}] failed to write {}: no Keep services available ({})".format(
1281                     request_id, data_hash, loop.last_result()))
1282         else:
1283             service_errors = ((key, roots_map[key].last_result()['error'])
1284                               for key in sorted_roots
1285                               if roots_map[key].last_result()['error'])
1286             raise arvados.errors.KeepWriteError(
1287                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1288                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1289
1290     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1291         """A stub for put().
1292
1293         This method is used in place of the real put() method when
1294         using local storage (see constructor's local_store argument).
1295
1296         copies and num_retries arguments are ignored: they are here
1297         only for the sake of offering the same call signature as
1298         put().
1299
1300         Data stored this way can be retrieved via local_store_get().
1301         """
1302         md5 = hashlib.md5(data).hexdigest()
1303         locator = '%s+%d' % (md5, len(data))
1304         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1305             f.write(data)
1306         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1307                   os.path.join(self.local_store, md5))
1308         return locator
1309
1310     def local_store_get(self, loc_s, num_retries=None):
1311         """Companion to local_store_put()."""
1312         try:
1313             locator = KeepLocator(loc_s)
1314         except ValueError:
1315             raise arvados.errors.NotFoundError(
1316                 "Invalid data locator: '%s'" % loc_s)
1317         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1318             return b''
1319         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1320             return f.read()
1321
1322     def local_store_head(self, loc_s, num_retries=None):
1323         """Companion to local_store_put()."""
1324         try:
1325             locator = KeepLocator(loc_s)
1326         except ValueError:
1327             raise arvados.errors.NotFoundError(
1328                 "Invalid data locator: '%s'" % loc_s)
1329         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1330             return True
1331         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1332             return True
1333
1334     def is_cached(self, locator):
1335         return self.block_cache.reserve_cache(expect_hash)