df01c3a55b74e3e1792f1c8e0b0d1846e45e20ee
[arvados.git] / sdk / python / arvados / keep.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from __future__ import division
7 import copy
8 from future import standard_library
9 from future.utils import native_str
10 standard_library.install_aliases()
11 from builtins import next
12 from builtins import str
13 from builtins import range
14 from builtins import object
15 import collections
16 import datetime
17 import hashlib
18 import io
19 import logging
20 import math
21 import os
22 import pycurl
23 import queue
24 import re
25 import socket
26 import ssl
27 import sys
28 import threading
29 from . import timer
30 import urllib.parse
31
32 if sys.version_info >= (3, 0):
33     from io import BytesIO
34 else:
35     from cStringIO import StringIO as BytesIO
36
37 import arvados
38 import arvados.config as config
39 import arvados.errors
40 import arvados.retry as retry
41 import arvados.util
42
43 _logger = logging.getLogger('arvados.keep')
44 global_client_object = None
45
46
47 # Monkey patch TCP constants when not available (apple). Values sourced from:
48 # http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
49 if sys.platform == 'darwin':
50     if not hasattr(socket, 'TCP_KEEPALIVE'):
51         socket.TCP_KEEPALIVE = 0x010
52     if not hasattr(socket, 'TCP_KEEPINTVL'):
53         socket.TCP_KEEPINTVL = 0x101
54     if not hasattr(socket, 'TCP_KEEPCNT'):
55         socket.TCP_KEEPCNT = 0x102
56
57
58 class KeepLocator(object):
59     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
60     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
61
62     def __init__(self, locator_str):
63         self.hints = []
64         self._perm_sig = None
65         self._perm_expiry = None
66         pieces = iter(locator_str.split('+'))
67         self.md5sum = next(pieces)
68         try:
69             self.size = int(next(pieces))
70         except StopIteration:
71             self.size = None
72         for hint in pieces:
73             if self.HINT_RE.match(hint) is None:
74                 raise ValueError("invalid hint format: {}".format(hint))
75             elif hint.startswith('A'):
76                 self.parse_permission_hint(hint)
77             else:
78                 self.hints.append(hint)
79
80     def __str__(self):
81         return '+'.join(
82             native_str(s)
83             for s in [self.md5sum, self.size,
84                       self.permission_hint()] + self.hints
85             if s is not None)
86
87     def stripped(self):
88         if self.size is not None:
89             return "%s+%i" % (self.md5sum, self.size)
90         else:
91             return self.md5sum
92
93     def _make_hex_prop(name, length):
94         # Build and return a new property with the given name that
95         # must be a hex string of the given length.
96         data_name = '_{}'.format(name)
97         def getter(self):
98             return getattr(self, data_name)
99         def setter(self, hex_str):
100             if not arvados.util.is_hex(hex_str, length):
101                 raise ValueError("{} is not a {}-digit hex string: {!r}".
102                                  format(name, length, hex_str))
103             setattr(self, data_name, hex_str)
104         return property(getter, setter)
105
106     md5sum = _make_hex_prop('md5sum', 32)
107     perm_sig = _make_hex_prop('perm_sig', 40)
108
109     @property
110     def perm_expiry(self):
111         return self._perm_expiry
112
113     @perm_expiry.setter
114     def perm_expiry(self, value):
115         if not arvados.util.is_hex(value, 1, 8):
116             raise ValueError(
117                 "permission timestamp must be a hex Unix timestamp: {}".
118                 format(value))
119         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
120
121     def permission_hint(self):
122         data = [self.perm_sig, self.perm_expiry]
123         if None in data:
124             return None
125         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
126         return "A{}@{:08x}".format(*data)
127
128     def parse_permission_hint(self, s):
129         try:
130             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
131         except IndexError:
132             raise ValueError("bad permission hint {}".format(s))
133
134     def permission_expired(self, as_of_dt=None):
135         if self.perm_expiry is None:
136             return False
137         elif as_of_dt is None:
138             as_of_dt = datetime.datetime.now()
139         return self.perm_expiry <= as_of_dt
140
141
142 class Keep(object):
143     """Simple interface to a global KeepClient object.
144
145     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
146     own API client.  The global KeepClient will build an API client from the
147     current Arvados configuration, which may not match the one you built.
148     """
149     _last_key = None
150
151     @classmethod
152     def global_client_object(cls):
153         global global_client_object
154         # Previously, KeepClient would change its behavior at runtime based
155         # on these configuration settings.  We simulate that behavior here
156         # by checking the values and returning a new KeepClient if any of
157         # them have changed.
158         key = (config.get('ARVADOS_API_HOST'),
159                config.get('ARVADOS_API_TOKEN'),
160                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
161                config.get('ARVADOS_KEEP_PROXY'),
162                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
163                os.environ.get('KEEP_LOCAL_STORE'))
164         if (global_client_object is None) or (cls._last_key != key):
165             global_client_object = KeepClient()
166             cls._last_key = key
167         return global_client_object
168
169     @staticmethod
170     def get(locator, **kwargs):
171         return Keep.global_client_object().get(locator, **kwargs)
172
173     @staticmethod
174     def put(data, **kwargs):
175         return Keep.global_client_object().put(data, **kwargs)
176
177 class KeepBlockCache(object):
178     # Default RAM cache is 256MiB
179     def __init__(self, cache_max=(1024 * 1024 * 1024)):
180         self.cache_max = cache_max
181         self._cache = []
182         self._cache_lock = threading.Lock()
183
184     class CacheSlot(object):
185         __slots__ = ("locator", "ready", "content")
186
187         def __init__(self, locator):
188             self.locator = locator
189             self.ready = threading.Event()
190             self.content = None
191
192         def get(self):
193             self.ready.wait()
194             return self.content
195
196         def set(self, value):
197             self.content = value
198             self.ready.set()
199
200         def size(self):
201             if self.content is None:
202                 return 0
203             else:
204                 return len(self.content)
205
206     def cap_cache(self):
207         '''Cap the cache size to self.cache_max'''
208         with self._cache_lock:
209             # Select all slots except those where ready.is_set() and content is
210             # None (that means there was an error reading the block).
211             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
212             sm = sum([slot.size() for slot in self._cache])
213             while len(self._cache) > 0 and sm > self.cache_max:
214                 for i in range(len(self._cache)-1, -1, -1):
215                     if self._cache[i].ready.is_set():
216                         del self._cache[i]
217                         break
218                 sm = sum([slot.size() for slot in self._cache])
219
220     def _get(self, locator):
221         # Test if the locator is already in the cache
222         for i in range(0, len(self._cache)):
223             if self._cache[i].locator == locator:
224                 n = self._cache[i]
225                 if i != 0:
226                     # move it to the front
227                     del self._cache[i]
228                     self._cache.insert(0, n)
229                 return n
230         return None
231
232     def get(self, locator):
233         with self._cache_lock:
234             return self._get(locator)
235
236     def reserve_cache(self, locator):
237         '''Reserve a cache slot for the specified locator,
238         or return the existing slot.'''
239         with self._cache_lock:
240             n = self._get(locator)
241             if n:
242                 return n, False
243             else:
244                 # Add a new cache slot for the locator
245                 n = KeepBlockCache.CacheSlot(locator)
246                 self._cache.insert(0, n)
247                 return n, True
248
249 class Counter(object):
250     def __init__(self, v=0):
251         self._lk = threading.Lock()
252         self._val = v
253
254     def add(self, v):
255         with self._lk:
256             self._val += v
257
258     def get(self):
259         with self._lk:
260             return self._val
261
262
263 class KeepClient(object):
264
265     # Default Keep server connection timeout:  2 seconds
266     # Default Keep server read timeout:       256 seconds
267     # Default Keep server bandwidth minimum:  32768 bytes per second
268     # Default Keep proxy connection timeout:  20 seconds
269     # Default Keep proxy read timeout:        256 seconds
270     # Default Keep proxy bandwidth minimum:   32768 bytes per second
271     DEFAULT_TIMEOUT = (2, 256, 32768)
272     DEFAULT_PROXY_TIMEOUT = (20, 256, 32768)
273
274
275     class KeepService(object):
276         """Make requests to a single Keep service, and track results.
277
278         A KeepService is intended to last long enough to perform one
279         transaction (GET or PUT) against one Keep service. This can
280         involve calling either get() or put() multiple times in order
281         to retry after transient failures. However, calling both get()
282         and put() on a single instance -- or using the same instance
283         to access two different Keep services -- will not produce
284         sensible behavior.
285         """
286
287         HTTP_ERRORS = (
288             socket.error,
289             ssl.SSLError,
290             arvados.errors.HttpError,
291         )
292
293         def __init__(self, root, user_agent_pool=queue.LifoQueue(),
294                      upload_counter=None,
295                      download_counter=None,
296                      headers={},
297                      insecure=False):
298             self.root = root
299             self._user_agent_pool = user_agent_pool
300             self._result = {'error': None}
301             self._usable = True
302             self._session = None
303             self._socket = None
304             self.get_headers = {'Accept': 'application/octet-stream'}
305             self.get_headers.update(headers)
306             self.put_headers = headers
307             self.upload_counter = upload_counter
308             self.download_counter = download_counter
309             self.insecure = insecure
310
311         def usable(self):
312             """Is it worth attempting a request?"""
313             return self._usable
314
315         def finished(self):
316             """Did the request succeed or encounter permanent failure?"""
317             return self._result['error'] == False or not self._usable
318
319         def last_result(self):
320             return self._result
321
322         def _get_user_agent(self):
323             try:
324                 return self._user_agent_pool.get(block=False)
325             except queue.Empty:
326                 return pycurl.Curl()
327
328         def _put_user_agent(self, ua):
329             try:
330                 ua.reset()
331                 self._user_agent_pool.put(ua, block=False)
332             except:
333                 ua.close()
334
335         def _socket_open(self, *args, **kwargs):
336             if len(args) + len(kwargs) == 2:
337                 return self._socket_open_pycurl_7_21_5(*args, **kwargs)
338             else:
339                 return self._socket_open_pycurl_7_19_3(*args, **kwargs)
340
341         def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
342             return self._socket_open_pycurl_7_21_5(
343                 purpose=None,
344                 address=collections.namedtuple(
345                     'Address', ['family', 'socktype', 'protocol', 'addr'],
346                 )(family, socktype, protocol, address))
347
348         def _socket_open_pycurl_7_21_5(self, purpose, address):
349             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
350             s = socket.socket(address.family, address.socktype, address.protocol)
351             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
352             # Will throw invalid protocol error on mac. This test prevents that.
353             if hasattr(socket, 'TCP_KEEPIDLE'):
354                 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
355             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
356             self._socket = s
357             return s
358
359         def get(self, locator, method="GET", timeout=None):
360             # locator is a KeepLocator object.
361             url = self.root + str(locator)
362             _logger.debug("Request: %s %s", method, url)
363             curl = self._get_user_agent()
364             ok = None
365             try:
366                 with timer.Timer() as t:
367                     self._headers = {}
368                     response_body = BytesIO()
369                     curl.setopt(pycurl.NOSIGNAL, 1)
370                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
371                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
372                     curl.setopt(pycurl.URL, url.encode('utf-8'))
373                     curl.setopt(pycurl.HTTPHEADER, [
374                         '{}: {}'.format(k,v) for k,v in self.get_headers.items()])
375                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
376                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
377                     if self.insecure:
378                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
379                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
380                     else:
381                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
382                     if method == "HEAD":
383                         curl.setopt(pycurl.NOBODY, True)
384                     self._setcurltimeouts(curl, timeout, method=="HEAD")
385
386                     try:
387                         curl.perform()
388                     except Exception as e:
389                         raise arvados.errors.HttpError(0, str(e))
390                     finally:
391                         if self._socket:
392                             self._socket.close()
393                             self._socket = None
394                     self._result = {
395                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
396                         'body': response_body.getvalue(),
397                         'headers': self._headers,
398                         'error': False,
399                     }
400
401                 ok = retry.check_http_response_success(self._result['status_code'])
402                 if not ok:
403                     self._result['error'] = arvados.errors.HttpError(
404                         self._result['status_code'],
405                         self._headers.get('x-status-line', 'Error'))
406             except self.HTTP_ERRORS as e:
407                 self._result = {
408                     'error': e,
409                 }
410             self._usable = ok != False
411             if self._result.get('status_code', None):
412                 # The client worked well enough to get an HTTP status
413                 # code, so presumably any problems are just on the
414                 # server side and it's OK to reuse the client.
415                 self._put_user_agent(curl)
416             else:
417                 # Don't return this client to the pool, in case it's
418                 # broken.
419                 curl.close()
420             if not ok:
421                 _logger.debug("Request fail: GET %s => %s: %s",
422                               url, type(self._result['error']), str(self._result['error']))
423                 return None
424             if method == "HEAD":
425                 _logger.info("HEAD %s: %s bytes",
426                          self._result['status_code'],
427                          self._result.get('content-length'))
428                 if self._result['headers'].get('x-keep-locator'):
429                     # This is a response to a remote block copy request, return
430                     # the local copy block locator.
431                     return self._result['headers'].get('x-keep-locator')
432                 return True
433
434             _logger.info("GET %s: %s bytes in %s msec (%.3f MiB/sec)",
435                          self._result['status_code'],
436                          len(self._result['body']),
437                          t.msecs,
438                          1.0*len(self._result['body'])/2**20/t.secs if t.secs > 0 else 0)
439
440             if self.download_counter:
441                 self.download_counter.add(len(self._result['body']))
442             resp_md5 = hashlib.md5(self._result['body']).hexdigest()
443             if resp_md5 != locator.md5sum:
444                 _logger.warning("Checksum fail: md5(%s) = %s",
445                                 url, resp_md5)
446                 self._result['error'] = arvados.errors.HttpError(
447                     0, 'Checksum fail')
448                 return None
449             return self._result['body']
450
451         def put(self, hash_s, body, timeout=None, headers={}):
452             put_headers = copy.copy(self.put_headers)
453             put_headers.update(headers)
454             url = self.root + hash_s
455             _logger.debug("Request: PUT %s", url)
456             curl = self._get_user_agent()
457             ok = None
458             try:
459                 with timer.Timer() as t:
460                     self._headers = {}
461                     body_reader = BytesIO(body)
462                     response_body = BytesIO()
463                     curl.setopt(pycurl.NOSIGNAL, 1)
464                     curl.setopt(pycurl.OPENSOCKETFUNCTION,
465                                 lambda *args, **kwargs: self._socket_open(*args, **kwargs))
466                     curl.setopt(pycurl.URL, url.encode('utf-8'))
467                     # Using UPLOAD tells cURL to wait for a "go ahead" from the
468                     # Keep server (in the form of a HTTP/1.1 "100 Continue"
469                     # response) instead of sending the request body immediately.
470                     # This allows the server to reject the request if the request
471                     # is invalid or the server is read-only, without waiting for
472                     # the client to send the entire block.
473                     curl.setopt(pycurl.UPLOAD, True)
474                     curl.setopt(pycurl.INFILESIZE, len(body))
475                     curl.setopt(pycurl.READFUNCTION, body_reader.read)
476                     curl.setopt(pycurl.HTTPHEADER, [
477                         '{}: {}'.format(k,v) for k,v in put_headers.items()])
478                     curl.setopt(pycurl.WRITEFUNCTION, response_body.write)
479                     curl.setopt(pycurl.HEADERFUNCTION, self._headerfunction)
480                     if self.insecure:
481                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
482                         curl.setopt(pycurl.SSL_VERIFYHOST, 0)
483                     else:
484                         curl.setopt(pycurl.CAINFO, arvados.util.ca_certs_path())
485                     self._setcurltimeouts(curl, timeout)
486                     try:
487                         curl.perform()
488                     except Exception as e:
489                         raise arvados.errors.HttpError(0, str(e))
490                     finally:
491                         if self._socket:
492                             self._socket.close()
493                             self._socket = None
494                     self._result = {
495                         'status_code': curl.getinfo(pycurl.RESPONSE_CODE),
496                         'body': response_body.getvalue().decode('utf-8'),
497                         'headers': self._headers,
498                         'error': False,
499                     }
500                 ok = retry.check_http_response_success(self._result['status_code'])
501                 if not ok:
502                     self._result['error'] = arvados.errors.HttpError(
503                         self._result['status_code'],
504                         self._headers.get('x-status-line', 'Error'))
505             except self.HTTP_ERRORS as e:
506                 self._result = {
507                     'error': e,
508                 }
509             self._usable = ok != False # still usable if ok is True or None
510             if self._result.get('status_code', None):
511                 # Client is functional. See comment in get().
512                 self._put_user_agent(curl)
513             else:
514                 curl.close()
515             if not ok:
516                 _logger.debug("Request fail: PUT %s => %s: %s",
517                               url, type(self._result['error']), str(self._result['error']))
518                 return False
519             _logger.info("PUT %s: %s bytes in %s msec (%.3f MiB/sec)",
520                          self._result['status_code'],
521                          len(body),
522                          t.msecs,
523                          1.0*len(body)/2**20/t.secs if t.secs > 0 else 0)
524             if self.upload_counter:
525                 self.upload_counter.add(len(body))
526             return True
527
528         def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
529             if not timeouts:
530                 return
531             elif isinstance(timeouts, tuple):
532                 if len(timeouts) == 2:
533                     conn_t, xfer_t = timeouts
534                     bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
535                 else:
536                     conn_t, xfer_t, bandwidth_bps = timeouts
537             else:
538                 conn_t, xfer_t = (timeouts, timeouts)
539                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
540             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
541             if not ignore_bandwidth:
542                 curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
543                 curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
544
545         def _headerfunction(self, header_line):
546             if isinstance(header_line, bytes):
547                 header_line = header_line.decode('iso-8859-1')
548             if ':' in header_line:
549                 name, value = header_line.split(':', 1)
550                 name = name.strip().lower()
551                 value = value.strip()
552             elif self._headers:
553                 name = self._lastheadername
554                 value = self._headers[name] + ' ' + header_line.strip()
555             elif header_line.startswith('HTTP/'):
556                 name = 'x-status-line'
557                 value = header_line
558             else:
559                 _logger.error("Unexpected header line: %s", header_line)
560                 return
561             self._lastheadername = name
562             self._headers[name] = value
563             # Returning None implies all bytes were written
564
565
566     class KeepWriterQueue(queue.Queue):
567         def __init__(self, copies, classes=[]):
568             queue.Queue.__init__(self) # Old-style superclass
569             self.wanted_copies = copies
570             self.wanted_storage_classes = classes
571             self.successful_copies = 0
572             self.confirmed_storage_classes = {}
573             self.response = None
574             self.storage_classes_tracking = True
575             self.queue_data_lock = threading.RLock()
576             self.pending_tries = max(copies, len(classes))
577             self.pending_tries_notification = threading.Condition()
578
579         def write_success(self, response, replicas_nr, classes_confirmed):
580             with self.queue_data_lock:
581                 self.successful_copies += replicas_nr
582                 if classes_confirmed is None:
583                     self.storage_classes_tracking = False
584                 elif self.storage_classes_tracking:
585                     for st_class, st_copies in classes_confirmed.items():
586                         try:
587                             self.confirmed_storage_classes[st_class] += st_copies
588                         except KeyError:
589                             self.confirmed_storage_classes[st_class] = st_copies
590                     self.pending_tries = max(self.wanted_copies - self.successful_copies, len(self.pending_classes()))
591                 self.response = response
592             with self.pending_tries_notification:
593                 self.pending_tries_notification.notify_all()
594
595         def write_fail(self, ks):
596             with self.pending_tries_notification:
597                 self.pending_tries += 1
598                 self.pending_tries_notification.notify()
599
600         def pending_copies(self):
601             with self.queue_data_lock:
602                 return self.wanted_copies - self.successful_copies
603
604         def satisfied_classes(self):
605             with self.queue_data_lock:
606                 if not self.storage_classes_tracking:
607                     # Notifies disabled storage classes expectation to
608                     # the outer loop.
609                     return None
610             return list(set(self.wanted_storage_classes) - set(self.pending_classes()))
611
612         def pending_classes(self):
613             with self.queue_data_lock:
614                 if (not self.storage_classes_tracking) or (self.wanted_storage_classes is None):
615                     return []
616                 unsatisfied_classes = copy.copy(self.wanted_storage_classes)
617                 for st_class, st_copies in self.confirmed_storage_classes.items():
618                     if st_class in unsatisfied_classes and st_copies >= self.wanted_copies:
619                         unsatisfied_classes.remove(st_class)
620                 return unsatisfied_classes
621
622         def get_next_task(self):
623             with self.pending_tries_notification:
624                 while True:
625                     if self.pending_copies() < 1 and len(self.pending_classes()) == 0:
626                         # This notify_all() is unnecessary --
627                         # write_success() already called notify_all()
628                         # when pending<1 became true, so it's not
629                         # possible for any other thread to be in
630                         # wait() now -- but it's cheap insurance
631                         # against deadlock so we do it anyway:
632                         self.pending_tries_notification.notify_all()
633                         # Drain the queue and then raise Queue.Empty
634                         while True:
635                             self.get_nowait()
636                             self.task_done()
637                     elif self.pending_tries > 0:
638                         service, service_root = self.get_nowait()
639                         if service.finished():
640                             self.task_done()
641                             continue
642                         self.pending_tries -= 1
643                         return service, service_root
644                     elif self.empty():
645                         self.pending_tries_notification.notify_all()
646                         raise queue.Empty
647                     else:
648                         self.pending_tries_notification.wait()
649
650
651     class KeepWriterThreadPool(object):
652         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None, classes=[]):
653             self.total_task_nr = 0
654             if (not max_service_replicas) or (max_service_replicas >= copies):
655                 num_threads = 1
656             else:
657                 num_threads = int(math.ceil(1.0*copies/max_service_replicas))
658             _logger.debug("Pool max threads is %d", num_threads)
659             self.workers = []
660             self.queue = KeepClient.KeepWriterQueue(copies, classes)
661             # Create workers
662             for _ in range(num_threads):
663                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
664                 self.workers.append(w)
665
666         def add_task(self, ks, service_root):
667             self.queue.put((ks, service_root))
668             self.total_task_nr += 1
669
670         def done(self):
671             return self.queue.successful_copies, self.queue.satisfied_classes()
672
673         def join(self):
674             # Start workers
675             for worker in self.workers:
676                 worker.start()
677             # Wait for finished work
678             self.queue.join()
679
680         def response(self):
681             return self.queue.response
682
683
684     class KeepWriterThread(threading.Thread):
685         class TaskFailed(RuntimeError): pass
686
687         def __init__(self, queue, data, data_hash, timeout=None):
688             super(KeepClient.KeepWriterThread, self).__init__()
689             self.timeout = timeout
690             self.queue = queue
691             self.data = data
692             self.data_hash = data_hash
693             self.daemon = True
694
695         def run(self):
696             while True:
697                 try:
698                     service, service_root = self.queue.get_next_task()
699                 except queue.Empty:
700                     return
701                 try:
702                     locator, copies, classes = self.do_task(service, service_root)
703                 except Exception as e:
704                     if not isinstance(e, self.TaskFailed):
705                         _logger.exception("Exception in KeepWriterThread")
706                     self.queue.write_fail(service)
707                 else:
708                     self.queue.write_success(locator, copies, classes)
709                 finally:
710                     self.queue.task_done()
711
712         def do_task(self, service, service_root):
713             classes = self.queue.pending_classes()
714             headers = {}
715             if len(classes) > 0:
716                 classes.sort()
717                 headers['X-Keep-Storage-Classes'] = ', '.join(classes)
718             success = bool(service.put(self.data_hash,
719                                         self.data,
720                                         timeout=self.timeout,
721                                         headers=headers))
722             result = service.last_result()
723
724             if not success:
725                 if result.get('status_code'):
726                     _logger.debug("Request fail: PUT %s => %s %s",
727                                   self.data_hash,
728                                   result.get('status_code'),
729                                   result.get('body'))
730                 raise self.TaskFailed()
731
732             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
733                           str(threading.current_thread()),
734                           self.data_hash,
735                           len(self.data),
736                           service_root)
737             try:
738                 replicas_stored = int(result['headers']['x-keep-replicas-stored'])
739             except (KeyError, ValueError):
740                 replicas_stored = 1
741
742             classes_confirmed = {}
743             try:
744                 scch = result['headers']['x-keep-storage-classes-confirmed']
745                 for confirmation in scch.replace(' ', '').split(','):
746                     if '=' in confirmation:
747                         stored_class, stored_copies = confirmation.split('=')[:2]
748                         classes_confirmed[stored_class] = int(stored_copies)
749             except (KeyError, ValueError):
750                 # Storage classes confirmed header missing or corrupt
751                 classes_confirmed = None
752
753             return result['body'].strip(), replicas_stored, classes_confirmed
754
755
756     def __init__(self, api_client=None, proxy=None,
757                  timeout=DEFAULT_TIMEOUT, proxy_timeout=DEFAULT_PROXY_TIMEOUT,
758                  api_token=None, local_store=None, block_cache=None,
759                  num_retries=0, session=None):
760         """Initialize a new KeepClient.
761
762         Arguments:
763         :api_client:
764           The API client to use to find Keep services.  If not
765           provided, KeepClient will build one from available Arvados
766           configuration.
767
768         :proxy:
769           If specified, this KeepClient will send requests to this Keep
770           proxy.  Otherwise, KeepClient will fall back to the setting of the
771           ARVADOS_KEEP_SERVICES or ARVADOS_KEEP_PROXY configuration settings.
772           If you want to KeepClient does not use a proxy, pass in an empty
773           string.
774
775         :timeout:
776           The initial timeout (in seconds) for HTTP requests to Keep
777           non-proxy servers.  A tuple of three floats is interpreted as
778           (connection_timeout, read_timeout, minimum_bandwidth). A connection
779           will be aborted if the average traffic rate falls below
780           minimum_bandwidth bytes per second over an interval of read_timeout
781           seconds. Because timeouts are often a result of transient server
782           load, the actual connection timeout will be increased by a factor
783           of two on each retry.
784           Default: (2, 256, 32768).
785
786         :proxy_timeout:
787           The initial timeout (in seconds) for HTTP requests to
788           Keep proxies. A tuple of three floats is interpreted as
789           (connection_timeout, read_timeout, minimum_bandwidth). The behavior
790           described above for adjusting connection timeouts on retry also
791           applies.
792           Default: (20, 256, 32768).
793
794         :api_token:
795           If you're not using an API client, but only talking
796           directly to a Keep proxy, this parameter specifies an API token
797           to authenticate Keep requests.  It is an error to specify both
798           api_client and api_token.  If you specify neither, KeepClient
799           will use one available from the Arvados configuration.
800
801         :local_store:
802           If specified, this KeepClient will bypass Keep
803           services, and save data to the named directory.  If unspecified,
804           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
805           environment variable.  If you want to ensure KeepClient does not
806           use local storage, pass in an empty string.  This is primarily
807           intended to mock a server for testing.
808
809         :num_retries:
810           The default number of times to retry failed requests.
811           This will be used as the default num_retries value when get() and
812           put() are called.  Default 0.
813         """
814         self.lock = threading.Lock()
815         if proxy is None:
816             if config.get('ARVADOS_KEEP_SERVICES'):
817                 proxy = config.get('ARVADOS_KEEP_SERVICES')
818             else:
819                 proxy = config.get('ARVADOS_KEEP_PROXY')
820         if api_token is None:
821             if api_client is None:
822                 api_token = config.get('ARVADOS_API_TOKEN')
823             else:
824                 api_token = api_client.api_token
825         elif api_client is not None:
826             raise ValueError(
827                 "can't build KeepClient with both API client and token")
828         if local_store is None:
829             local_store = os.environ.get('KEEP_LOCAL_STORE')
830
831         if api_client is None:
832             self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
833         else:
834             self.insecure = api_client.insecure
835
836         self.block_cache = block_cache if block_cache else KeepBlockCache()
837         self.timeout = timeout
838         self.proxy_timeout = proxy_timeout
839         self._user_agent_pool = queue.LifoQueue()
840         self.upload_counter = Counter()
841         self.download_counter = Counter()
842         self.put_counter = Counter()
843         self.get_counter = Counter()
844         self.hits_counter = Counter()
845         self.misses_counter = Counter()
846         self._storage_classes_unsupported_warning = False
847         self._default_classes = []
848
849         if local_store:
850             self.local_store = local_store
851             self.head = self.local_store_head
852             self.get = self.local_store_get
853             self.put = self.local_store_put
854         else:
855             self.num_retries = num_retries
856             self.max_replicas_per_service = None
857             if proxy:
858                 proxy_uris = proxy.split()
859                 for i in range(len(proxy_uris)):
860                     if not proxy_uris[i].endswith('/'):
861                         proxy_uris[i] += '/'
862                     # URL validation
863                     url = urllib.parse.urlparse(proxy_uris[i])
864                     if not (url.scheme and url.netloc):
865                         raise arvados.errors.ArgumentError("Invalid proxy URI: {}".format(proxy_uris[i]))
866                 self.api_token = api_token
867                 self._gateway_services = {}
868                 self._keep_services = [{
869                     'uuid': "00000-bi6l4-%015d" % idx,
870                     'service_type': 'proxy',
871                     '_service_root': uri,
872                     } for idx, uri in enumerate(proxy_uris)]
873                 self._writable_services = self._keep_services
874                 self.using_proxy = True
875                 self._static_services_list = True
876             else:
877                 # It's important to avoid instantiating an API client
878                 # unless we actually need one, for testing's sake.
879                 if api_client is None:
880                     api_client = arvados.api('v1')
881                 self.api_client = api_client
882                 self.api_token = api_client.api_token
883                 self._gateway_services = {}
884                 self._keep_services = None
885                 self._writable_services = None
886                 self.using_proxy = None
887                 self._static_services_list = False
888                 try:
889                     self._default_classes = [
890                         k for k, v in self.api_client.config()['StorageClasses'].items() if v['Default']]
891                 except KeyError:
892                     # We're talking to an old cluster
893                     pass
894
895     def current_timeout(self, attempt_number):
896         """Return the appropriate timeout to use for this client.
897
898         The proxy timeout setting if the backend service is currently a proxy,
899         the regular timeout setting otherwise.  The `attempt_number` indicates
900         how many times the operation has been tried already (starting from 0
901         for the first try), and scales the connection timeout portion of the
902         return value accordingly.
903
904         """
905         # TODO(twp): the timeout should be a property of a
906         # KeepService, not a KeepClient. See #4488.
907         t = self.proxy_timeout if self.using_proxy else self.timeout
908         if len(t) == 2:
909             return (t[0] * (1 << attempt_number), t[1])
910         else:
911             return (t[0] * (1 << attempt_number), t[1], t[2])
912     def _any_nondisk_services(self, service_list):
913         return any(ks.get('service_type', 'disk') != 'disk'
914                    for ks in service_list)
915
916     def build_services_list(self, force_rebuild=False):
917         if (self._static_services_list or
918               (self._keep_services and not force_rebuild)):
919             return
920         with self.lock:
921             try:
922                 keep_services = self.api_client.keep_services().accessible()
923             except Exception:  # API server predates Keep services.
924                 keep_services = self.api_client.keep_disks().list()
925
926             # Gateway services are only used when specified by UUID,
927             # so there's nothing to gain by filtering them by
928             # service_type.
929             self._gateway_services = {ks['uuid']: ks for ks in
930                                       keep_services.execute()['items']}
931             if not self._gateway_services:
932                 raise arvados.errors.NoKeepServersError()
933
934             # Precompute the base URI for each service.
935             for r in self._gateway_services.values():
936                 host = r['service_host']
937                 if not host.startswith('[') and host.find(':') >= 0:
938                     # IPv6 URIs must be formatted like http://[::1]:80/...
939                     host = '[' + host + ']'
940                 r['_service_root'] = "{}://{}:{:d}/".format(
941                     'https' if r['service_ssl_flag'] else 'http',
942                     host,
943                     r['service_port'])
944
945             _logger.debug(str(self._gateway_services))
946             self._keep_services = [
947                 ks for ks in self._gateway_services.values()
948                 if not ks.get('service_type', '').startswith('gateway:')]
949             self._writable_services = [ks for ks in self._keep_services
950                                        if not ks.get('read_only')]
951
952             # For disk type services, max_replicas_per_service is 1
953             # It is unknown (unlimited) for other service types.
954             if self._any_nondisk_services(self._writable_services):
955                 self.max_replicas_per_service = None
956             else:
957                 self.max_replicas_per_service = 1
958
959     def _service_weight(self, data_hash, service_uuid):
960         """Compute the weight of a Keep service endpoint for a data
961         block with a known hash.
962
963         The weight is md5(h + u) where u is the last 15 characters of
964         the service endpoint's UUID.
965         """
966         return hashlib.md5((data_hash + service_uuid[-15:]).encode()).hexdigest()
967
968     def weighted_service_roots(self, locator, force_rebuild=False, need_writable=False):
969         """Return an array of Keep service endpoints, in the order in
970         which they should be probed when reading or writing data with
971         the given hash+hints.
972         """
973         self.build_services_list(force_rebuild)
974
975         sorted_roots = []
976         # Use the services indicated by the given +K@... remote
977         # service hints, if any are present and can be resolved to a
978         # URI.
979         for hint in locator.hints:
980             if hint.startswith('K@'):
981                 if len(hint) == 7:
982                     sorted_roots.append(
983                         "https://keep.{}.arvadosapi.com/".format(hint[2:]))
984                 elif len(hint) == 29:
985                     svc = self._gateway_services.get(hint[2:])
986                     if svc:
987                         sorted_roots.append(svc['_service_root'])
988
989         # Sort the available local services by weight (heaviest first)
990         # for this locator, and return their service_roots (base URIs)
991         # in that order.
992         use_services = self._keep_services
993         if need_writable:
994             use_services = self._writable_services
995         self.using_proxy = self._any_nondisk_services(use_services)
996         sorted_roots.extend([
997             svc['_service_root'] for svc in sorted(
998                 use_services,
999                 reverse=True,
1000                 key=lambda svc: self._service_weight(locator.md5sum, svc['uuid']))])
1001         _logger.debug("{}: {}".format(locator, sorted_roots))
1002         return sorted_roots
1003
1004     def map_new_services(self, roots_map, locator, force_rebuild, need_writable, headers):
1005         # roots_map is a dictionary, mapping Keep service root strings
1006         # to KeepService objects.  Poll for Keep services, and add any
1007         # new ones to roots_map.  Return the current list of local
1008         # root strings.
1009         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
1010         local_roots = self.weighted_service_roots(locator, force_rebuild, need_writable)
1011         for root in local_roots:
1012             if root not in roots_map:
1013                 roots_map[root] = self.KeepService(
1014                     root, self._user_agent_pool,
1015                     upload_counter=self.upload_counter,
1016                     download_counter=self.download_counter,
1017                     headers=headers,
1018                     insecure=self.insecure)
1019         return local_roots
1020
1021     @staticmethod
1022     def _check_loop_result(result):
1023         # KeepClient RetryLoops should save results as a 2-tuple: the
1024         # actual result of the request, and the number of servers available
1025         # to receive the request this round.
1026         # This method returns True if there's a real result, False if
1027         # there are no more servers available, otherwise None.
1028         if isinstance(result, Exception):
1029             return None
1030         result, tried_server_count = result
1031         if (result is not None) and (result is not False):
1032             return True
1033         elif tried_server_count < 1:
1034             _logger.info("No more Keep services to try; giving up")
1035             return False
1036         else:
1037             return None
1038
1039     def get_from_cache(self, loc_s):
1040         """Fetch a block only if is in the cache, otherwise return None."""
1041         locator = KeepLocator(loc_s)
1042         slot = self.block_cache.get(locator.md5sum)
1043         if slot is not None and slot.ready.is_set():
1044             return slot.get()
1045         else:
1046             return None
1047
1048     def has_cache_slot(self, loc_s):
1049         locator = KeepLocator(loc_s)
1050         return self.block_cache.get(locator.md5sum) is not None
1051
1052     def refresh_signature(self, loc):
1053         """Ask Keep to get the remote block and return its local signature"""
1054         now = datetime.datetime.utcnow().isoformat("T") + 'Z'
1055         return self.head(loc, headers={'X-Keep-Signature': 'local, {}'.format(now)})
1056
1057     @retry.retry_method
1058     def head(self, loc_s, **kwargs):
1059         return self._get_or_head(loc_s, method="HEAD", **kwargs)
1060
1061     @retry.retry_method
1062     def get(self, loc_s, **kwargs):
1063         return self._get_or_head(loc_s, method="GET", **kwargs)
1064
1065     def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
1066         """Get data from Keep.
1067
1068         This method fetches one or more blocks of data from Keep.  It
1069         sends a request each Keep service registered with the API
1070         server (or the proxy provided when this client was
1071         instantiated), then each service named in location hints, in
1072         sequence.  As soon as one service provides the data, it's
1073         returned.
1074
1075         Arguments:
1076         * loc_s: A string of one or more comma-separated locators to fetch.
1077           This method returns the concatenation of these blocks.
1078         * num_retries: The number of times to retry GET requests to
1079           *each* Keep server if it returns temporary failures, with
1080           exponential backoff.  Note that, in each loop, the method may try
1081           to fetch data from every available Keep service, along with any
1082           that are named in location hints in the locator.  The default value
1083           is set when the KeepClient is initialized.
1084         """
1085         if ',' in loc_s:
1086             return ''.join(self.get(x) for x in loc_s.split(','))
1087
1088         self.get_counter.add(1)
1089
1090         request_id = (request_id or
1091                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1092                       arvados.util.new_request_id())
1093         if headers is None:
1094             headers = {}
1095         headers['X-Request-Id'] = request_id
1096
1097         slot = None
1098         blob = None
1099         try:
1100             locator = KeepLocator(loc_s)
1101             if method == "GET":
1102                 slot, first = self.block_cache.reserve_cache(locator.md5sum)
1103                 if not first:
1104                     self.hits_counter.add(1)
1105                     blob = slot.get()
1106                     if blob is None:
1107                         raise arvados.errors.KeepReadError(
1108                             "failed to read {}".format(loc_s))
1109                     return blob
1110
1111             self.misses_counter.add(1)
1112
1113             # If the locator has hints specifying a prefix (indicating a
1114             # remote keepproxy) or the UUID of a local gateway service,
1115             # read data from the indicated service(s) instead of the usual
1116             # list of local disk services.
1117             hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
1118                           for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
1119             hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
1120                                for hint in locator.hints if (
1121                                        hint.startswith('K@') and
1122                                        len(hint) == 29 and
1123                                        self._gateway_services.get(hint[2:])
1124                                        )])
1125             # Map root URLs to their KeepService objects.
1126             roots_map = {
1127                 root: self.KeepService(root, self._user_agent_pool,
1128                                        upload_counter=self.upload_counter,
1129                                        download_counter=self.download_counter,
1130                                        headers=headers,
1131                                        insecure=self.insecure)
1132                 for root in hint_roots
1133             }
1134
1135             # See #3147 for a discussion of the loop implementation.  Highlights:
1136             # * Refresh the list of Keep services after each failure, in case
1137             #   it's being updated.
1138             # * Retry until we succeed, we're out of retries, or every available
1139             #   service has returned permanent failure.
1140             sorted_roots = []
1141             roots_map = {}
1142             loop = retry.RetryLoop(num_retries, self._check_loop_result,
1143                                    backoff_start=2)
1144             for tries_left in loop:
1145                 try:
1146                     sorted_roots = self.map_new_services(
1147                         roots_map, locator,
1148                         force_rebuild=(tries_left < num_retries),
1149                         need_writable=False,
1150                         headers=headers)
1151                 except Exception as error:
1152                     loop.save_result(error)
1153                     continue
1154
1155                 # Query KeepService objects that haven't returned
1156                 # permanent failure, in our specified shuffle order.
1157                 services_to_try = [roots_map[root]
1158                                    for root in sorted_roots
1159                                    if roots_map[root].usable()]
1160                 for keep_service in services_to_try:
1161                     blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
1162                     if blob is not None:
1163                         break
1164                 loop.save_result((blob, len(services_to_try)))
1165
1166             # Always cache the result, then return it if we succeeded.
1167             if loop.success():
1168                 return blob
1169         finally:
1170             if slot is not None:
1171                 slot.set(blob)
1172                 self.block_cache.cap_cache()
1173
1174         # Q: Including 403 is necessary for the Keep tests to continue
1175         # passing, but maybe they should expect KeepReadError instead?
1176         not_founds = sum(1 for key in sorted_roots
1177                          if roots_map[key].last_result().get('status_code', None) in {403, 404, 410})
1178         service_errors = ((key, roots_map[key].last_result()['error'])
1179                           for key in sorted_roots)
1180         if not roots_map:
1181             raise arvados.errors.KeepReadError(
1182                 "[{}] failed to read {}: no Keep services available ({})".format(
1183                     request_id, loc_s, loop.last_result()))
1184         elif not_founds == len(sorted_roots):
1185             raise arvados.errors.NotFoundError(
1186                 "[{}] {} not found".format(request_id, loc_s), service_errors)
1187         else:
1188             raise arvados.errors.KeepReadError(
1189                 "[{}] failed to read {} after {}".format(request_id, loc_s, loop.attempts_str()), service_errors, label="service")
1190
1191     @retry.retry_method
1192     def put(self, data, copies=2, num_retries=None, request_id=None, classes=None):
1193         """Save data in Keep.
1194
1195         This method will get a list of Keep services from the API server, and
1196         send the data to each one simultaneously in a new thread.  Once the
1197         uploads are finished, if enough copies are saved, this method returns
1198         the most recent HTTP response body.  If requests fail to upload
1199         enough copies, this method raises KeepWriteError.
1200
1201         Arguments:
1202         * data: The string of data to upload.
1203         * copies: The number of copies that the user requires be saved.
1204           Default 2.
1205         * num_retries: The number of times to retry PUT requests to
1206           *each* Keep server if it returns temporary failures, with
1207           exponential backoff.  The default value is set when the
1208           KeepClient is initialized.
1209         * classes: An optional list of storage class names where copies should
1210           be written.
1211         """
1212
1213         classes = classes or self._default_classes
1214
1215         if not isinstance(data, bytes):
1216             data = data.encode()
1217
1218         self.put_counter.add(1)
1219
1220         data_hash = hashlib.md5(data).hexdigest()
1221         loc_s = data_hash + '+' + str(len(data))
1222         if copies < 1:
1223             return loc_s
1224         locator = KeepLocator(loc_s)
1225
1226         request_id = (request_id or
1227                       (hasattr(self, 'api_client') and self.api_client.request_id) or
1228                       arvados.util.new_request_id())
1229         headers = {
1230             'X-Request-Id': request_id,
1231             'X-Keep-Desired-Replicas': str(copies),
1232         }
1233         roots_map = {}
1234         loop = retry.RetryLoop(num_retries, self._check_loop_result,
1235                                backoff_start=2)
1236         done_copies = 0
1237         done_classes = []
1238         for tries_left in loop:
1239             try:
1240                 sorted_roots = self.map_new_services(
1241                     roots_map, locator,
1242                     force_rebuild=(tries_left < num_retries),
1243                     need_writable=True,
1244                     headers=headers)
1245             except Exception as error:
1246                 loop.save_result(error)
1247                 continue
1248
1249             pending_classes = []
1250             if done_classes is not None:
1251                 pending_classes = list(set(classes) - set(done_classes))
1252             writer_pool = KeepClient.KeepWriterThreadPool(data=data,
1253                                                         data_hash=data_hash,
1254                                                         copies=copies - done_copies,
1255                                                         max_service_replicas=self.max_replicas_per_service,
1256                                                         timeout=self.current_timeout(num_retries - tries_left),
1257                                                         classes=pending_classes)
1258             for service_root, ks in [(root, roots_map[root])
1259                                      for root in sorted_roots]:
1260                 if ks.finished():
1261                     continue
1262                 writer_pool.add_task(ks, service_root)
1263             writer_pool.join()
1264             pool_copies, pool_classes = writer_pool.done()
1265             done_copies += pool_copies
1266             if (done_classes is not None) and (pool_classes is not None):
1267                 done_classes += pool_classes
1268                 loop.save_result(
1269                     (done_copies >= copies and set(done_classes) == set(classes),
1270                     writer_pool.total_task_nr))
1271             else:
1272                 # Old keepstore contacted without storage classes support:
1273                 # success is determined only by successful copies.
1274                 #
1275                 # Disable storage classes tracking from this point forward.
1276                 if not self._storage_classes_unsupported_warning:
1277                     self._storage_classes_unsupported_warning = True
1278                     _logger.warning("X-Keep-Storage-Classes header not supported by the cluster")
1279                 done_classes = None
1280                 loop.save_result(
1281                     (done_copies >= copies, writer_pool.total_task_nr))
1282
1283         if loop.success():
1284             return writer_pool.response()
1285         if not roots_map:
1286             raise arvados.errors.KeepWriteError(
1287                 "[{}] failed to write {}: no Keep services available ({})".format(
1288                     request_id, data_hash, loop.last_result()))
1289         else:
1290             service_errors = ((key, roots_map[key].last_result()['error'])
1291                               for key in sorted_roots
1292                               if roots_map[key].last_result()['error'])
1293             raise arvados.errors.KeepWriteError(
1294                 "[{}] failed to write {} after {} (wanted {} copies but wrote {})".format(
1295                     request_id, data_hash, loop.attempts_str(), (copies, classes), writer_pool.done()), service_errors, label="service")
1296
1297     def local_store_put(self, data, copies=1, num_retries=None, classes=[]):
1298         """A stub for put().
1299
1300         This method is used in place of the real put() method when
1301         using local storage (see constructor's local_store argument).
1302
1303         copies and num_retries arguments are ignored: they are here
1304         only for the sake of offering the same call signature as
1305         put().
1306
1307         Data stored this way can be retrieved via local_store_get().
1308         """
1309         md5 = hashlib.md5(data).hexdigest()
1310         locator = '%s+%d' % (md5, len(data))
1311         with open(os.path.join(self.local_store, md5 + '.tmp'), 'wb') as f:
1312             f.write(data)
1313         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
1314                   os.path.join(self.local_store, md5))
1315         return locator
1316
1317     def local_store_get(self, loc_s, num_retries=None):
1318         """Companion to local_store_put()."""
1319         try:
1320             locator = KeepLocator(loc_s)
1321         except ValueError:
1322             raise arvados.errors.NotFoundError(
1323                 "Invalid data locator: '%s'" % loc_s)
1324         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1325             return b''
1326         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
1327             return f.read()
1328
1329     def local_store_head(self, loc_s, num_retries=None):
1330         """Companion to local_store_put()."""
1331         try:
1332             locator = KeepLocator(loc_s)
1333         except ValueError:
1334             raise arvados.errors.NotFoundError(
1335                 "Invalid data locator: '%s'" % loc_s)
1336         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
1337             return True
1338         if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
1339             return True
1340