Merge branch '4015-collection-chooser-portable-data-hash'
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import timer
21 import datetime
22 import ssl
23 import socket
24
25 _logger = logging.getLogger('arvados.keep')
26 global_client_object = None
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 class KeepLocator(object):
35     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
36     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37
38     def __init__(self, locator_str):
39         self.hints = []
40         self._perm_sig = None
41         self._perm_expiry = None
42         pieces = iter(locator_str.split('+'))
43         self.md5sum = next(pieces)
44         try:
45             self.size = int(next(pieces))
46         except StopIteration:
47             self.size = None
48         for hint in pieces:
49             if self.HINT_RE.match(hint) is None:
50                 raise ValueError("unrecognized hint data {}".format(hint))
51             elif hint.startswith('A'):
52                 self.parse_permission_hint(hint)
53             else:
54                 self.hints.append(hint)
55
56     def __str__(self):
57         return '+'.join(
58             str(s) for s in [self.md5sum, self.size,
59                              self.permission_hint()] + self.hints
60             if s is not None)
61
62     def _make_hex_prop(name, length):
63         # Build and return a new property with the given name that
64         # must be a hex string of the given length.
65         data_name = '_{}'.format(name)
66         def getter(self):
67             return getattr(self, data_name)
68         def setter(self, hex_str):
69             if not arvados.util.is_hex(hex_str, length):
70                 raise ValueError("{} must be a {}-digit hex string: {}".
71                                  format(name, length, hex_str))
72             setattr(self, data_name, hex_str)
73         return property(getter, setter)
74
75     md5sum = _make_hex_prop('md5sum', 32)
76     perm_sig = _make_hex_prop('perm_sig', 40)
77
78     @property
79     def perm_expiry(self):
80         return self._perm_expiry
81
82     @perm_expiry.setter
83     def perm_expiry(self, value):
84         if not arvados.util.is_hex(value, 1, 8):
85             raise ValueError(
86                 "permission timestamp must be a hex Unix timestamp: {}".
87                 format(value))
88         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89
90     def permission_hint(self):
91         data = [self.perm_sig, self.perm_expiry]
92         if None in data:
93             return None
94         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
95         return "A{}@{:08x}".format(*data)
96
97     def parse_permission_hint(self, s):
98         try:
99             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100         except IndexError:
101             raise ValueError("bad permission hint {}".format(s))
102
103     def permission_expired(self, as_of_dt=None):
104         if self.perm_expiry is None:
105             return False
106         elif as_of_dt is None:
107             as_of_dt = datetime.datetime.now()
108         return self.perm_expiry <= as_of_dt
109
110
111 class Keep(object):
112     """Simple interface to a global KeepClient object.
113
114     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
115     own API client.  The global KeepClient will build an API client from the
116     current Arvados configuration, which may not match the one you built.
117     """
118     _last_key = None
119
120     @classmethod
121     def global_client_object(cls):
122         global global_client_object
123         # Previously, KeepClient would change its behavior at runtime based
124         # on these configuration settings.  We simulate that behavior here
125         # by checking the values and returning a new KeepClient if any of
126         # them have changed.
127         key = (config.get('ARVADOS_API_HOST'),
128                config.get('ARVADOS_API_TOKEN'),
129                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
130                config.get('ARVADOS_KEEP_PROXY'),
131                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
132                os.environ.get('KEEP_LOCAL_STORE'))
133         if (global_client_object is None) or (cls._last_key != key):
134             global_client_object = KeepClient()
135             cls._last_key = key
136         return global_client_object
137
138     @staticmethod
139     def get(locator, **kwargs):
140         return Keep.global_client_object().get(locator, **kwargs)
141
142     @staticmethod
143     def put(data, **kwargs):
144         return Keep.global_client_object().put(data, **kwargs)
145
146 class KeepBlockCache(object):
147     # Default RAM cache is 256MiB
148     def __init__(self, cache_max=(256 * 1024 * 1024)):
149         self.cache_max = cache_max
150         self._cache = []
151         self._cache_lock = threading.Lock()
152
153     class CacheSlot(object):
154         def __init__(self, locator):
155             self.locator = locator
156             self.ready = threading.Event()
157             self.content = None
158
159         def get(self):
160             self.ready.wait()
161             return self.content
162
163         def set(self, value):
164             self.content = value
165             self.ready.set()
166
167         def size(self):
168             if self.content == None:
169                 return 0
170             else:
171                 return len(self.content)
172
173     def cap_cache(self):
174         '''Cap the cache size to self.cache_max'''
175         self._cache_lock.acquire()
176         try:
177             # Select all slots except those where ready.is_set() and content is
178             # None (that means there was an error reading the block).
179             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
180             sm = sum([slot.size() for slot in self._cache])
181             while len(self._cache) > 0 and sm > self.cache_max:
182                 for i in xrange(len(self._cache)-1, -1, -1):
183                     if self._cache[i].ready.is_set():
184                         del self._cache[i]
185                         break
186                 sm = sum([slot.size() for slot in self._cache])
187         finally:
188             self._cache_lock.release()
189
190     def reserve_cache(self, locator):
191         '''Reserve a cache slot for the specified locator,
192         or return the existing slot.'''
193         self._cache_lock.acquire()
194         try:
195             # Test if the locator is already in the cache
196             for i in xrange(0, len(self._cache)):
197                 if self._cache[i].locator == locator:
198                     n = self._cache[i]
199                     if i != 0:
200                         # move it to the front
201                         del self._cache[i]
202                         self._cache.insert(0, n)
203                     return n, False
204
205             # Add a new cache slot for the locator
206             n = KeepBlockCache.CacheSlot(locator)
207             self._cache.insert(0, n)
208             return n, True
209         finally:
210             self._cache_lock.release()
211
212 class KeepClient(object):
213     class ThreadLimiter(object):
214         """
215         Limit the number of threads running at a given time to
216         {desired successes} minus {successes reported}. When successes
217         reported == desired, wake up the remaining threads and tell
218         them to quit.
219
220         Should be used in a "with" block.
221         """
222         def __init__(self, todo):
223             self._todo = todo
224             self._done = 0
225             self._response = None
226             self._todo_lock = threading.Semaphore(todo)
227             self._done_lock = threading.Lock()
228
229         def __enter__(self):
230             self._todo_lock.acquire()
231             return self
232
233         def __exit__(self, type, value, traceback):
234             self._todo_lock.release()
235
236         def shall_i_proceed(self):
237             """
238             Return true if the current thread should do stuff. Return
239             false if the current thread should just stop.
240             """
241             with self._done_lock:
242                 return (self._done < self._todo)
243
244         def save_response(self, response_body, replicas_stored):
245             """
246             Records a response body (a locator, possibly signed) returned by
247             the Keep server.  It is not necessary to save more than
248             one response, since we presume that any locator returned
249             in response to a successful request is valid.
250             """
251             with self._done_lock:
252                 self._done += replicas_stored
253                 self._response = response_body
254
255         def response(self):
256             """
257             Returns the body from the response to a PUT request.
258             """
259             with self._done_lock:
260                 return self._response
261
262         def done(self):
263             """
264             Return how many successes were reported.
265             """
266             with self._done_lock:
267                 return self._done
268
269
270     class KeepService(object):
271         # Make requests to a single Keep service, and track results.
272         HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
273                        socket.error, ssl.SSLError)
274
275         def __init__(self, root, **headers):
276             self.root = root
277             self.last_result = None
278             self.success_flag = None
279             self.get_headers = {'Accept': 'application/octet-stream'}
280             self.get_headers.update(headers)
281             self.put_headers = headers
282
283         def usable(self):
284             return self.success_flag is not False
285
286         def finished(self):
287             return self.success_flag is not None
288
289         def last_status(self):
290             try:
291                 return int(self.last_result[0].status)
292             except (AttributeError, IndexError, ValueError):
293                 return None
294
295         def get(self, http, locator):
296             # http is an httplib2.Http object.
297             # locator is a KeepLocator object.
298             url = self.root + str(locator)
299             _logger.debug("Request: GET %s", url)
300             try:
301                 with timer.Timer() as t:
302                     result = http.request(url.encode('utf-8'), 'GET',
303                                           headers=self.get_headers)
304             except self.HTTP_ERRORS as e:
305                 _logger.debug("Request fail: GET %s => %s: %s",
306                               url, type(e), str(e))
307                 self.last_result = e
308             else:
309                 self.last_result = result
310                 self.success_flag = retry.check_http_response_success(result)
311                 content = result[1]
312                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
313                              self.last_status(), len(content), t.msecs,
314                              (len(content)/(1024.0*1024))/t.secs)
315                 if self.success_flag:
316                     resp_md5 = hashlib.md5(content).hexdigest()
317                     if resp_md5 == locator.md5sum:
318                         return content
319                     _logger.warning("Checksum fail: md5(%s) = %s",
320                                     url, resp_md5)
321             return None
322
323         def put(self, http, hash_s, body):
324             url = self.root + hash_s
325             _logger.debug("Request: PUT %s", url)
326             try:
327                 result = http.request(url.encode('utf-8'), 'PUT',
328                                       headers=self.put_headers, body=body)
329             except self.HTTP_ERRORS as e:
330                 _logger.debug("Request fail: PUT %s => %s: %s",
331                               url, type(e), str(e))
332                 self.last_result = e
333             else:
334                 self.last_result = result
335                 self.success_flag = retry.check_http_response_success(result)
336             return self.success_flag
337
338
339     class KeepWriterThread(threading.Thread):
340         """
341         Write a blob of data to the given Keep server. On success, call
342         save_response() of the given ThreadLimiter to save the returned
343         locator.
344         """
345         def __init__(self, keep_service, **kwargs):
346             super(KeepClient.KeepWriterThread, self).__init__()
347             self.service = keep_service
348             self.args = kwargs
349             self._success = False
350
351         def success(self):
352             return self._success
353
354         def run(self):
355             with self.args['thread_limiter'] as limiter:
356                 if not limiter.shall_i_proceed():
357                     # My turn arrived, but the job has been done without
358                     # me.
359                     return
360                 self.run_with_limiter(limiter)
361
362         def run_with_limiter(self, limiter):
363             if self.service.finished():
364                 return
365             _logger.debug("KeepWriterThread %s proceeding %s %s",
366                           str(threading.current_thread()),
367                           self.args['data_hash'],
368                           self.args['service_root'])
369             h = httplib2.Http(timeout=self.args.get('timeout', None))
370             self._success = bool(self.service.put(
371                     h, self.args['data_hash'], self.args['data']))
372             status = self.service.last_status()
373             if self._success:
374                 resp, body = self.service.last_result
375                 _logger.debug("KeepWriterThread %s succeeded %s %s",
376                               str(threading.current_thread()),
377                               self.args['data_hash'],
378                               self.args['service_root'])
379                 # Tick the 'done' counter for the number of replica
380                 # reported stored by the server, for the case that
381                 # we're talking to a proxy or other backend that
382                 # stores to multiple copies for us.
383                 try:
384                     replicas_stored = int(resp['x-keep-replicas-stored'])
385                 except (KeyError, ValueError):
386                     replicas_stored = 1
387                 limiter.save_response(body.strip(), replicas_stored)
388             elif status is not None:
389                 _logger.debug("Request fail: PUT %s => %s %s",
390                               self.args['data_hash'], status,
391                               self.service.last_result[1])
392
393
394     def __init__(self, api_client=None, proxy=None, timeout=300,
395                  api_token=None, local_store=None, block_cache=None,
396                  num_retries=0):
397         """Initialize a new KeepClient.
398
399         Arguments:
400         * api_client: The API client to use to find Keep services.  If not
401           provided, KeepClient will build one from available Arvados
402           configuration.
403         * proxy: If specified, this KeepClient will send requests to this
404           Keep proxy.  Otherwise, KeepClient will fall back to the setting
405           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
406           ensure KeepClient does not use a proxy, pass in an empty string.
407         * timeout: The timeout for all HTTP requests, in seconds.  Default
408           300.
409         * api_token: If you're not using an API client, but only talking
410           directly to a Keep proxy, this parameter specifies an API token
411           to authenticate Keep requests.  It is an error to specify both
412           api_client and api_token.  If you specify neither, KeepClient
413           will use one available from the Arvados configuration.
414         * local_store: If specified, this KeepClient will bypass Keep
415           services, and save data to the named directory.  If unspecified,
416           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
417           environment variable.  If you want to ensure KeepClient does not
418           use local storage, pass in an empty string.  This is primarily
419           intended to mock a server for testing.
420         * num_retries: The default number of times to retry failed requests.
421           This will be used as the default num_retries value when get() and
422           put() are called.  Default 0.
423         """
424         self.lock = threading.Lock()
425         if proxy is None:
426             proxy = config.get('ARVADOS_KEEP_PROXY')
427         if api_token is None:
428             if api_client is None:
429                 api_token = config.get('ARVADOS_API_TOKEN')
430             else:
431                 api_token = api_client.api_token
432         elif api_client is not None:
433             raise ValueError(
434                 "can't build KeepClient with both API client and token")
435         if local_store is None:
436             local_store = os.environ.get('KEEP_LOCAL_STORE')
437
438         self.block_cache = block_cache if block_cache else KeepBlockCache()
439
440         if local_store:
441             self.local_store = local_store
442             self.get = self.local_store_get
443             self.put = self.local_store_put
444         else:
445             self.timeout = timeout
446             self.num_retries = num_retries
447             if proxy:
448                 if not proxy.endswith('/'):
449                     proxy += '/'
450                 self.api_token = api_token
451                 self.service_roots = [proxy]
452                 self.using_proxy = True
453                 self.static_service_roots = True
454             else:
455                 # It's important to avoid instantiating an API client
456                 # unless we actually need one, for testing's sake.
457                 if api_client is None:
458                     api_client = arvados.api('v1')
459                 self.api_client = api_client
460                 self.api_token = api_client.api_token
461                 self.service_roots = None
462                 self.using_proxy = None
463                 self.static_service_roots = False
464
465     def build_service_roots(self, force_rebuild=False):
466         if (self.static_service_roots or
467               (self.service_roots and not force_rebuild)):
468             return
469         with self.lock:
470             try:
471                 keep_services = self.api_client.keep_services().accessible()
472             except Exception:  # API server predates Keep services.
473                 keep_services = self.api_client.keep_disks().list()
474
475             keep_services = keep_services.execute().get('items')
476             if not keep_services:
477                 raise arvados.errors.NoKeepServersError()
478
479             self.using_proxy = any(ks.get('service_type') == 'proxy'
480                                    for ks in keep_services)
481
482             roots = ("{}://[{}]:{:d}/".format(
483                         'https' if ks['service_ssl_flag'] else 'http',
484                          ks['service_host'],
485                          ks['service_port'])
486                      for ks in keep_services)
487             self.service_roots = sorted(set(roots))
488             _logger.debug(str(self.service_roots))
489
490     def shuffled_service_roots(self, hash, force_rebuild=False):
491         self.build_service_roots(force_rebuild)
492
493         # Build an ordering with which to query the Keep servers based on the
494         # contents of the hash.
495         # "hash" is a hex-encoded number at least 8 digits
496         # (32 bits) long
497
498         # seed used to calculate the next keep server from 'pool'
499         # to be added to 'pseq'
500         seed = hash
501
502         # Keep servers still to be added to the ordering
503         pool = self.service_roots[:]
504
505         # output probe sequence
506         pseq = []
507
508         # iterate while there are servers left to be assigned
509         while len(pool) > 0:
510             if len(seed) < 8:
511                 # ran out of digits in the seed
512                 if len(pseq) < len(hash) / 4:
513                     # the number of servers added to the probe sequence is less
514                     # than the number of 4-digit slices in 'hash' so refill the
515                     # seed with the last 4 digits and then append the contents
516                     # of 'hash'.
517                     seed = hash[-4:] + hash
518                 else:
519                     # refill the seed with the contents of 'hash'
520                     seed += hash
521
522             # Take the next 8 digits (32 bytes) and interpret as an integer,
523             # then modulus with the size of the remaining pool to get the next
524             # selected server.
525             probe = int(seed[0:8], 16) % len(pool)
526
527             # Append the selected server to the probe sequence and remove it
528             # from the pool.
529             pseq += [pool[probe]]
530             pool = pool[:probe] + pool[probe+1:]
531
532             # Remove the digits just used from the seed
533             seed = seed[8:]
534         _logger.debug(str(pseq))
535         return pseq
536
537
538     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
539         # roots_map is a dictionary, mapping Keep service root strings
540         # to KeepService objects.  Poll for Keep services, and add any
541         # new ones to roots_map.  Return the current list of local
542         # root strings.
543         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
544         local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
545         for root in local_roots:
546             if root not in roots_map:
547                 roots_map[root] = self.KeepService(root, **headers)
548         return local_roots
549
550     @staticmethod
551     def _check_loop_result(result):
552         # KeepClient RetryLoops should save results as a 2-tuple: the
553         # actual result of the request, and the number of servers available
554         # to receive the request this round.
555         # This method returns True if there's a real result, False if
556         # there are no more servers available, otherwise None.
557         if isinstance(result, Exception):
558             return None
559         result, tried_server_count = result
560         if (result is not None) and (result is not False):
561             return True
562         elif tried_server_count < 1:
563             _logger.info("No more Keep services to try; giving up")
564             return False
565         else:
566             return None
567
568     @retry.retry_method
569     def get(self, loc_s, num_retries=None):
570         """Get data from Keep.
571
572         This method fetches one or more blocks of data from Keep.  It
573         sends a request each Keep service registered with the API
574         server (or the proxy provided when this client was
575         instantiated), then each service named in location hints, in
576         sequence.  As soon as one service provides the data, it's
577         returned.
578
579         Arguments:
580         * loc_s: A string of one or more comma-separated locators to fetch.
581           This method returns the concatenation of these blocks.
582         * num_retries: The number of times to retry GET requests to
583           *each* Keep server if it returns temporary failures, with
584           exponential backoff.  Note that, in each loop, the method may try
585           to fetch data from every available Keep service, along with any
586           that are named in location hints in the locator.  The default value
587           is set when the KeepClient is initialized.
588         """
589         if ',' in loc_s:
590             return ''.join(self.get(x) for x in loc_s.split(','))
591         locator = KeepLocator(loc_s)
592         expect_hash = locator.md5sum
593
594         slot, first = self.block_cache.reserve_cache(expect_hash)
595         if not first:
596             v = slot.get()
597             return v
598
599         # See #3147 for a discussion of the loop implementation.  Highlights:
600         # * Refresh the list of Keep services after each failure, in case
601         #   it's being updated.
602         # * Retry until we succeed, we're out of retries, or every available
603         #   service has returned permanent failure.
604         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
605                       for hint in locator.hints if hint.startswith('K@')]
606         # Map root URLs their KeepService objects.
607         roots_map = {root: self.KeepService(root) for root in hint_roots}
608         blob = None
609         loop = retry.RetryLoop(num_retries, self._check_loop_result,
610                                backoff_start=2)
611         for tries_left in loop:
612             try:
613                 local_roots = self.map_new_services(
614                     roots_map, expect_hash,
615                     force_rebuild=(tries_left < num_retries))
616             except Exception as error:
617                 loop.save_result(error)
618                 continue
619
620             # Query KeepService objects that haven't returned
621             # permanent failure, in our specified shuffle order.
622             services_to_try = [roots_map[root]
623                                for root in (local_roots + hint_roots)
624                                if roots_map[root].usable()]
625             http = httplib2.Http(timeout=self.timeout)
626             for keep_service in services_to_try:
627                 blob = keep_service.get(http, locator)
628                 if blob is not None:
629                     break
630             loop.save_result((blob, len(services_to_try)))
631
632         # Always cache the result, then return it if we succeeded.
633         slot.set(blob)
634         self.block_cache.cap_cache()
635         if loop.success():
636             return blob
637
638         # No servers fulfilled the request.  Count how many responded
639         # "not found;" if the ratio is high enough (currently 75%), report
640         # Not Found; otherwise a generic error.
641         # Q: Including 403 is necessary for the Keep tests to continue
642         # passing, but maybe they should expect KeepReadError instead?
643         not_founds = sum(1 for ks in roots_map.values()
644                          if ks.last_status() in set([403, 404, 410]))
645         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
646             raise arvados.errors.NotFoundError(loc_s)
647         else:
648             raise arvados.errors.KeepReadError(loc_s)
649
650     @retry.retry_method
651     def put(self, data, copies=2, num_retries=None):
652         """Save data in Keep.
653
654         This method will get a list of Keep services from the API server, and
655         send the data to each one simultaneously in a new thread.  Once the
656         uploads are finished, if enough copies are saved, this method returns
657         the most recent HTTP response body.  If requests fail to upload
658         enough copies, this method raises KeepWriteError.
659
660         Arguments:
661         * data: The string of data to upload.
662         * copies: The number of copies that the user requires be saved.
663           Default 2.
664         * num_retries: The number of times to retry PUT requests to
665           *each* Keep server if it returns temporary failures, with
666           exponential backoff.  The default value is set when the
667           KeepClient is initialized.
668         """
669         data_hash = hashlib.md5(data).hexdigest()
670         if copies < 1:
671             return data_hash
672
673         headers = {}
674         if self.using_proxy:
675             # Tell the proxy how many copies we want it to store
676             headers['X-Keep-Desired-Replication'] = str(copies)
677         roots_map = {}
678         thread_limiter = KeepClient.ThreadLimiter(copies)
679         loop = retry.RetryLoop(num_retries, self._check_loop_result,
680                                backoff_start=2)
681         for tries_left in loop:
682             try:
683                 local_roots = self.map_new_services(
684                     roots_map, data_hash,
685                     force_rebuild=(tries_left < num_retries), **headers)
686             except Exception as error:
687                 loop.save_result(error)
688                 continue
689
690             threads = []
691             for service_root, ks in roots_map.iteritems():
692                 if ks.finished():
693                     continue
694                 t = KeepClient.KeepWriterThread(
695                     ks,
696                     data=data,
697                     data_hash=data_hash,
698                     service_root=service_root,
699                     thread_limiter=thread_limiter,
700                     timeout=self.timeout)
701                 t.start()
702                 threads.append(t)
703             for t in threads:
704                 t.join()
705             loop.save_result((thread_limiter.done() >= copies, len(threads)))
706
707         if loop.success():
708             return thread_limiter.response()
709         raise arvados.errors.KeepWriteError(
710             "Write fail for %s: wanted %d but wrote %d" %
711             (data_hash, copies, thread_limiter.done()))
712
713     # Local storage methods need no-op num_retries arguments to keep
714     # integration tests happy.  With better isolation they could
715     # probably be removed again.
716     def local_store_put(self, data, num_retries=0):
717         md5 = hashlib.md5(data).hexdigest()
718         locator = '%s+%d' % (md5, len(data))
719         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
720             f.write(data)
721         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
722                   os.path.join(self.local_store, md5))
723         return locator
724
725     def local_store_get(self, loc_s, num_retries=0):
726         try:
727             locator = KeepLocator(loc_s)
728         except ValueError:
729             raise arvados.errors.NotFoundError(
730                 "Invalid data locator: '%s'" % loc_s)
731         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
732             return ''
733         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
734             return f.read()