Merge branch '3381-job-progress-bar-bug' closes #3381
[arvados.git] / sdk / python / arvados / keep.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20 import timer
21 import datetime
22 import ssl
23 import socket
24
25 _logger = logging.getLogger('arvados.keep')
26 global_client_object = None
27
28 import arvados
29 import arvados.config as config
30 import arvados.errors
31 import arvados.retry as retry
32 import arvados.util
33
34 class KeepLocator(object):
35     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
36     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
37
38     def __init__(self, locator_str):
39         self.hints = []
40         self._perm_sig = None
41         self._perm_expiry = None
42         pieces = iter(locator_str.split('+'))
43         self.md5sum = next(pieces)
44         try:
45             self.size = int(next(pieces))
46         except StopIteration:
47             self.size = None
48         for hint in pieces:
49             if self.HINT_RE.match(hint) is None:
50                 raise ValueError("unrecognized hint data {}".format(hint))
51             elif hint.startswith('A'):
52                 self.parse_permission_hint(hint)
53             else:
54                 self.hints.append(hint)
55
56     def __str__(self):
57         return '+'.join(
58             str(s) for s in [self.md5sum, self.size,
59                              self.permission_hint()] + self.hints
60             if s is not None)
61
62     def _make_hex_prop(name, length):
63         # Build and return a new property with the given name that
64         # must be a hex string of the given length.
65         data_name = '_{}'.format(name)
66         def getter(self):
67             return getattr(self, data_name)
68         def setter(self, hex_str):
69             if not arvados.util.is_hex(hex_str, length):
70                 raise ValueError("{} must be a {}-digit hex string: {}".
71                                  format(name, length, hex_str))
72             setattr(self, data_name, hex_str)
73         return property(getter, setter)
74
75     md5sum = _make_hex_prop('md5sum', 32)
76     perm_sig = _make_hex_prop('perm_sig', 40)
77
78     @property
79     def perm_expiry(self):
80         return self._perm_expiry
81
82     @perm_expiry.setter
83     def perm_expiry(self, value):
84         if not arvados.util.is_hex(value, 1, 8):
85             raise ValueError(
86                 "permission timestamp must be a hex Unix timestamp: {}".
87                 format(value))
88         self._perm_expiry = datetime.datetime.utcfromtimestamp(int(value, 16))
89
90     def permission_hint(self):
91         data = [self.perm_sig, self.perm_expiry]
92         if None in data:
93             return None
94         data[1] = int((data[1] - self.EPOCH_DATETIME).total_seconds())
95         return "A{}@{:08x}".format(*data)
96
97     def parse_permission_hint(self, s):
98         try:
99             self.perm_sig, self.perm_expiry = s[1:].split('@', 1)
100         except IndexError:
101             raise ValueError("bad permission hint {}".format(s))
102
103     def permission_expired(self, as_of_dt=None):
104         if self.perm_expiry is None:
105             return False
106         elif as_of_dt is None:
107             as_of_dt = datetime.datetime.now()
108         return self.perm_expiry <= as_of_dt
109
110
111 class Keep(object):
112     """Simple interface to a global KeepClient object.
113
114     THIS CLASS IS DEPRECATED.  Please instantiate your own KeepClient with your
115     own API client.  The global KeepClient will build an API client from the
116     current Arvados configuration, which may not match the one you built.
117     """
118     _last_key = None
119
120     @classmethod
121     def global_client_object(cls):
122         global global_client_object
123         # Previously, KeepClient would change its behavior at runtime based
124         # on these configuration settings.  We simulate that behavior here
125         # by checking the values and returning a new KeepClient if any of
126         # them have changed.
127         key = (config.get('ARVADOS_API_HOST'),
128                config.get('ARVADOS_API_TOKEN'),
129                config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
130                config.get('ARVADOS_KEEP_PROXY'),
131                config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
132                os.environ.get('KEEP_LOCAL_STORE'))
133         if (global_client_object is None) or (cls._last_key != key):
134             global_client_object = KeepClient()
135             cls._last_key = key
136         return global_client_object
137
138     @staticmethod
139     def get(locator, **kwargs):
140         return Keep.global_client_object().get(locator, **kwargs)
141
142     @staticmethod
143     def put(data, **kwargs):
144         return Keep.global_client_object().put(data, **kwargs)
145
146 class KeepBlockCache(object):
147     # Default RAM cache is 256MiB
148     def __init__(self, cache_max=(256 * 1024 * 1024)):
149         self.cache_max = cache_max
150         self._cache = []
151         self._cache_lock = threading.Lock()
152
153     class CacheSlot(object):
154         def __init__(self, locator):
155             self.locator = locator
156             self.ready = threading.Event()
157             self.content = None
158
159         def get(self):
160             self.ready.wait()
161             return self.content
162
163         def set(self, value):
164             self.content = value
165             self.ready.set()
166
167         def size(self):
168             if self.content == None:
169                 return 0
170             else:
171                 return len(self.content)
172
173     def cap_cache(self):
174         '''Cap the cache size to self.cache_max'''
175         self._cache_lock.acquire()
176         try:
177             # Select all slots except those where ready.is_set() and content is
178             # None (that means there was an error reading the block).
179             self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
180             sm = sum([slot.size() for slot in self._cache])
181             while len(self._cache) > 0 and sm > self.cache_max:
182                 for i in xrange(len(self._cache)-1, -1, -1):
183                     if self._cache[i].ready.is_set():
184                         del self._cache[i]
185                         break
186                 sm = sum([slot.size() for slot in self._cache])
187         finally:
188             self._cache_lock.release()
189
190     def reserve_cache(self, locator):
191         '''Reserve a cache slot for the specified locator,
192         or return the existing slot.'''
193         self._cache_lock.acquire()
194         try:
195             # Test if the locator is already in the cache
196             for i in xrange(0, len(self._cache)):
197                 if self._cache[i].locator == locator:
198                     n = self._cache[i]
199                     if i != 0:
200                         # move it to the front
201                         del self._cache[i]
202                         self._cache.insert(0, n)
203                     return n, False
204
205             # Add a new cache slot for the locator
206             n = KeepBlockCache.CacheSlot(locator)
207             self._cache.insert(0, n)
208             return n, True
209         finally:
210             self._cache_lock.release()
211
212 class KeepClient(object):
213     class ThreadLimiter(object):
214         """
215         Limit the number of threads running at a given time to
216         {desired successes} minus {successes reported}. When successes
217         reported == desired, wake up the remaining threads and tell
218         them to quit.
219
220         Should be used in a "with" block.
221         """
222         def __init__(self, todo):
223             self._todo = todo
224             self._done = 0
225             self._response = None
226             self._todo_lock = threading.Semaphore(todo)
227             self._done_lock = threading.Lock()
228
229         def __enter__(self):
230             self._todo_lock.acquire()
231             return self
232
233         def __exit__(self, type, value, traceback):
234             self._todo_lock.release()
235
236         def shall_i_proceed(self):
237             """
238             Return true if the current thread should do stuff. Return
239             false if the current thread should just stop.
240             """
241             with self._done_lock:
242                 return (self._done < self._todo)
243
244         def save_response(self, response_body, replicas_stored):
245             """
246             Records a response body (a locator, possibly signed) returned by
247             the Keep server.  It is not necessary to save more than
248             one response, since we presume that any locator returned
249             in response to a successful request is valid.
250             """
251             with self._done_lock:
252                 self._done += replicas_stored
253                 self._response = response_body
254
255         def response(self):
256             """
257             Returns the body from the response to a PUT request.
258             """
259             with self._done_lock:
260                 return self._response
261
262         def done(self):
263             """
264             Return how many successes were reported.
265             """
266             with self._done_lock:
267                 return self._done
268
269
270     class KeepService(object):
271         # Make requests to a single Keep service, and track results.
272         HTTP_ERRORS = (httplib2.HttpLib2Error, httplib.HTTPException,
273                        socket.error, ssl.SSLError)
274
275         def __init__(self, root, **headers):
276             self.root = root
277             self.last_result = None
278             self.success_flag = None
279             self.get_headers = {'Accept': 'application/octet-stream'}
280             self.get_headers.update(headers)
281             self.put_headers = headers
282
283         def usable(self):
284             return self.success_flag is not False
285
286         def finished(self):
287             return self.success_flag is not None
288
289         def last_status(self):
290             try:
291                 return int(self.last_result[0].status)
292             except (AttributeError, IndexError, ValueError):
293                 return None
294
295         def get(self, http, locator):
296             # http is an httplib2.Http object.
297             # locator is a KeepLocator object.
298             url = self.root + str(locator)
299             _logger.debug("Request: GET %s", url)
300             try:
301                 with timer.Timer() as t:
302                     result = http.request(url.encode('utf-8'), 'GET',
303                                           headers=self.get_headers)
304             except self.HTTP_ERRORS as e:
305                 _logger.debug("Request fail: GET %s => %s: %s",
306                               url, type(e), str(e))
307                 self.last_result = e
308             else:
309                 self.last_result = result
310                 self.success_flag = retry.check_http_response_success(result)
311                 content = result[1]
312                 _logger.info("%s response: %s bytes in %s msec (%.3f MiB/sec)",
313                              self.last_status(), len(content), t.msecs,
314                              (len(content)/(1024.0*1024))/t.secs)
315                 if self.success_flag:
316                     resp_md5 = hashlib.md5(content).hexdigest()
317                     if resp_md5 == locator.md5sum:
318                         return content
319                     _logger.warning("Checksum fail: md5(%s) = %s",
320                                     url, resp_md5)
321             return None
322
323         def put(self, http, hash_s, body):
324             url = self.root + hash_s
325             _logger.debug("Request: PUT %s", url)
326             try:
327                 result = http.request(url.encode('utf-8'), 'PUT',
328                                       headers=self.put_headers, body=body)
329             except self.HTTP_ERRORS as e:
330                 _logger.debug("Request fail: PUT %s => %s: %s",
331                               url, type(e), str(e))
332                 self.last_result = e
333             else:
334                 self.last_result = result
335                 self.success_flag = retry.check_http_response_success(result)
336             return self.success_flag
337
338
339     class KeepWriterThread(threading.Thread):
340         """
341         Write a blob of data to the given Keep server. On success, call
342         save_response() of the given ThreadLimiter to save the returned
343         locator.
344         """
345         def __init__(self, keep_service, **kwargs):
346             super(KeepClient.KeepWriterThread, self).__init__()
347             self.service = keep_service
348             self.args = kwargs
349             self._success = False
350
351         def success(self):
352             return self._success
353
354         def run(self):
355             with self.args['thread_limiter'] as limiter:
356                 if not limiter.shall_i_proceed():
357                     # My turn arrived, but the job has been done without
358                     # me.
359                     return
360                 self.run_with_limiter(limiter)
361
362         def run_with_limiter(self, limiter):
363             if self.service.finished():
364                 return
365             _logger.debug("KeepWriterThread %s proceeding %s %s",
366                           str(threading.current_thread()),
367                           self.args['data_hash'],
368                           self.args['service_root'])
369             h = httplib2.Http(timeout=self.args.get('timeout', None))
370             self._success = bool(self.service.put(
371                     h, self.args['data_hash'], self.args['data']))
372             status = self.service.last_status()
373             if self._success:
374                 resp, body = self.service.last_result
375                 _logger.debug("KeepWriterThread %s succeeded %s %s",
376                               str(threading.current_thread()),
377                               self.args['data_hash'],
378                               self.args['service_root'])
379                 # Tick the 'done' counter for the number of replica
380                 # reported stored by the server, for the case that
381                 # we're talking to a proxy or other backend that
382                 # stores to multiple copies for us.
383                 try:
384                     replicas_stored = int(resp['x-keep-replicas-stored'])
385                 except (KeyError, ValueError):
386                     replicas_stored = 1
387                 limiter.save_response(body.strip(), replicas_stored)
388             elif status is not None:
389                 _logger.debug("Request fail: PUT %s => %s %s",
390                               self.args['data_hash'], status,
391                               self.service.last_result[1])
392
393
394     def __init__(self, api_client=None, proxy=None, timeout=300,
395                  api_token=None, local_store=None, block_cache=None,
396                  num_retries=0):
397         """Initialize a new KeepClient.
398
399         Arguments:
400         * api_client: The API client to use to find Keep services.  If not
401           provided, KeepClient will build one from available Arvados
402           configuration.
403         * proxy: If specified, this KeepClient will send requests to this
404           Keep proxy.  Otherwise, KeepClient will fall back to the setting
405           of the ARVADOS_KEEP_PROXY configuration setting.  If you want to
406           ensure KeepClient does not use a proxy, pass in an empty string.
407         * timeout: The timeout for all HTTP requests, in seconds.  Default
408           300.
409         * api_token: If you're not using an API client, but only talking
410           directly to a Keep proxy, this parameter specifies an API token
411           to authenticate Keep requests.  It is an error to specify both
412           api_client and api_token.  If you specify neither, KeepClient
413           will use one available from the Arvados configuration.
414         * local_store: If specified, this KeepClient will bypass Keep
415           services, and save data to the named directory.  If unspecified,
416           KeepClient will fall back to the setting of the $KEEP_LOCAL_STORE
417           environment variable.  If you want to ensure KeepClient does not
418           use local storage, pass in an empty string.  This is primarily
419           intended to mock a server for testing.
420         * num_retries: The default number of times to retry failed requests.
421           This will be used as the default num_retries value when get() and
422           put() are called.  Default 0.
423         """
424         self.lock = threading.Lock()
425         if proxy is None:
426             proxy = config.get('ARVADOS_KEEP_PROXY')
427         if api_token is None:
428             api_token = config.get('ARVADOS_API_TOKEN')
429         elif api_client is not None:
430             raise ValueError(
431                 "can't build KeepClient with both API client and token")
432         if local_store is None:
433             local_store = os.environ.get('KEEP_LOCAL_STORE')
434
435         self.block_cache = block_cache if block_cache else KeepBlockCache()
436
437         if local_store:
438             self.local_store = local_store
439             self.get = self.local_store_get
440             self.put = self.local_store_put
441         else:
442             self.timeout = timeout
443             self.num_retries = num_retries
444             if proxy:
445                 if not proxy.endswith('/'):
446                     proxy += '/'
447                 self.api_token = api_token
448                 self.service_roots = [proxy]
449                 self.using_proxy = True
450                 self.static_service_roots = True
451             else:
452                 # It's important to avoid instantiating an API client
453                 # unless we actually need one, for testing's sake.
454                 if api_client is None:
455                     api_client = arvados.api('v1')
456                 self.api_client = api_client
457                 self.api_token = api_client.api_token
458                 self.service_roots = None
459                 self.using_proxy = None
460                 self.static_service_roots = False
461
462     def build_service_roots(self, force_rebuild=False):
463         if (self.static_service_roots or
464               (self.service_roots and not force_rebuild)):
465             return
466         with self.lock:
467             try:
468                 keep_services = self.api_client.keep_services().accessible()
469             except Exception:  # API server predates Keep services.
470                 keep_services = self.api_client.keep_disks().list()
471
472             keep_services = keep_services.execute().get('items')
473             if not keep_services:
474                 raise arvados.errors.NoKeepServersError()
475
476             self.using_proxy = any(ks.get('service_type') == 'proxy'
477                                    for ks in keep_services)
478
479             roots = ("{}://[{}]:{:d}/".format(
480                         'https' if ks['service_ssl_flag'] else 'http',
481                          ks['service_host'],
482                          ks['service_port'])
483                      for ks in keep_services)
484             self.service_roots = sorted(set(roots))
485             _logger.debug(str(self.service_roots))
486
487     def shuffled_service_roots(self, hash, force_rebuild=False):
488         self.build_service_roots(force_rebuild)
489
490         # Build an ordering with which to query the Keep servers based on the
491         # contents of the hash.
492         # "hash" is a hex-encoded number at least 8 digits
493         # (32 bits) long
494
495         # seed used to calculate the next keep server from 'pool'
496         # to be added to 'pseq'
497         seed = hash
498
499         # Keep servers still to be added to the ordering
500         pool = self.service_roots[:]
501
502         # output probe sequence
503         pseq = []
504
505         # iterate while there are servers left to be assigned
506         while len(pool) > 0:
507             if len(seed) < 8:
508                 # ran out of digits in the seed
509                 if len(pseq) < len(hash) / 4:
510                     # the number of servers added to the probe sequence is less
511                     # than the number of 4-digit slices in 'hash' so refill the
512                     # seed with the last 4 digits and then append the contents
513                     # of 'hash'.
514                     seed = hash[-4:] + hash
515                 else:
516                     # refill the seed with the contents of 'hash'
517                     seed += hash
518
519             # Take the next 8 digits (32 bytes) and interpret as an integer,
520             # then modulus with the size of the remaining pool to get the next
521             # selected server.
522             probe = int(seed[0:8], 16) % len(pool)
523
524             # Append the selected server to the probe sequence and remove it
525             # from the pool.
526             pseq += [pool[probe]]
527             pool = pool[:probe] + pool[probe+1:]
528
529             # Remove the digits just used from the seed
530             seed = seed[8:]
531         _logger.debug(str(pseq))
532         return pseq
533
534
535     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
536         # roots_map is a dictionary, mapping Keep service root strings
537         # to KeepService objects.  Poll for Keep services, and add any
538         # new ones to roots_map.  Return the current list of local
539         # root strings.
540         headers.setdefault('Authorization', "OAuth2 %s" % (self.api_token,))
541         local_roots = self.shuffled_service_roots(md5_s, force_rebuild)
542         for root in local_roots:
543             if root not in roots_map:
544                 roots_map[root] = self.KeepService(root, **headers)
545         return local_roots
546
547     @staticmethod
548     def _check_loop_result(result):
549         # KeepClient RetryLoops should save results as a 2-tuple: the
550         # actual result of the request, and the number of servers available
551         # to receive the request this round.
552         # This method returns True if there's a real result, False if
553         # there are no more servers available, otherwise None.
554         if isinstance(result, Exception):
555             return None
556         result, tried_server_count = result
557         if (result is not None) and (result is not False):
558             return True
559         elif tried_server_count < 1:
560             _logger.info("No more Keep services to try; giving up")
561             return False
562         else:
563             return None
564
565     @retry.retry_method
566     def get(self, loc_s, num_retries=None):
567         """Get data from Keep.
568
569         This method fetches one or more blocks of data from Keep.  It
570         sends a request each Keep service registered with the API
571         server (or the proxy provided when this client was
572         instantiated), then each service named in location hints, in
573         sequence.  As soon as one service provides the data, it's
574         returned.
575
576         Arguments:
577         * loc_s: A string of one or more comma-separated locators to fetch.
578           This method returns the concatenation of these blocks.
579         * num_retries: The number of times to retry GET requests to
580           *each* Keep server if it returns temporary failures, with
581           exponential backoff.  Note that, in each loop, the method may try
582           to fetch data from every available Keep service, along with any
583           that are named in location hints in the locator.  The default value
584           is set when the KeepClient is initialized.
585         """
586         if ',' in loc_s:
587             return ''.join(self.get(x) for x in loc_s.split(','))
588         locator = KeepLocator(loc_s)
589         expect_hash = locator.md5sum
590
591         slot, first = self.block_cache.reserve_cache(expect_hash)
592         if not first:
593             v = slot.get()
594             return v
595
596         # See #3147 for a discussion of the loop implementation.  Highlights:
597         # * Refresh the list of Keep services after each failure, in case
598         #   it's being updated.
599         # * Retry until we succeed, we're out of retries, or every available
600         #   service has returned permanent failure.
601         hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
602                       for hint in locator.hints if hint.startswith('K@')]
603         # Map root URLs their KeepService objects.
604         roots_map = {root: self.KeepService(root) for root in hint_roots}
605         blob = None
606         loop = retry.RetryLoop(num_retries, self._check_loop_result,
607                                backoff_start=2)
608         for tries_left in loop:
609             try:
610                 local_roots = self.map_new_services(
611                     roots_map, expect_hash,
612                     force_rebuild=(tries_left < num_retries))
613             except Exception as error:
614                 loop.save_result(error)
615                 continue
616
617             # Query KeepService objects that haven't returned
618             # permanent failure, in our specified shuffle order.
619             services_to_try = [roots_map[root]
620                                for root in (local_roots + hint_roots)
621                                if roots_map[root].usable()]
622             http = httplib2.Http(timeout=self.timeout)
623             for keep_service in services_to_try:
624                 blob = keep_service.get(http, locator)
625                 if blob is not None:
626                     break
627             loop.save_result((blob, len(services_to_try)))
628
629         # Always cache the result, then return it if we succeeded.
630         slot.set(blob)
631         self.block_cache.cap_cache()
632         if loop.success():
633             return blob
634
635         # No servers fulfilled the request.  Count how many responded
636         # "not found;" if the ratio is high enough (currently 75%), report
637         # Not Found; otherwise a generic error.
638         # Q: Including 403 is necessary for the Keep tests to continue
639         # passing, but maybe they should expect KeepReadError instead?
640         not_founds = sum(1 for ks in roots_map.values()
641                          if ks.last_status() in set([403, 404, 410]))
642         if roots_map and ((float(not_founds) / len(roots_map)) >= .75):
643             raise arvados.errors.NotFoundError(loc_s)
644         else:
645             raise arvados.errors.KeepReadError(loc_s)
646
647     @retry.retry_method
648     def put(self, data, copies=2, num_retries=None):
649         """Save data in Keep.
650
651         This method will get a list of Keep services from the API server, and
652         send the data to each one simultaneously in a new thread.  Once the
653         uploads are finished, if enough copies are saved, this method returns
654         the most recent HTTP response body.  If requests fail to upload
655         enough copies, this method raises KeepWriteError.
656
657         Arguments:
658         * data: The string of data to upload.
659         * copies: The number of copies that the user requires be saved.
660           Default 2.
661         * num_retries: The number of times to retry PUT requests to
662           *each* Keep server if it returns temporary failures, with
663           exponential backoff.  The default value is set when the
664           KeepClient is initialized.
665         """
666         data_hash = hashlib.md5(data).hexdigest()
667         if copies < 1:
668             return data_hash
669
670         headers = {}
671         if self.using_proxy:
672             # Tell the proxy how many copies we want it to store
673             headers['X-Keep-Desired-Replication'] = str(copies)
674         roots_map = {}
675         thread_limiter = KeepClient.ThreadLimiter(copies)
676         loop = retry.RetryLoop(num_retries, self._check_loop_result,
677                                backoff_start=2)
678         for tries_left in loop:
679             try:
680                 local_roots = self.map_new_services(
681                     roots_map, data_hash,
682                     force_rebuild=(tries_left < num_retries), **headers)
683             except Exception as error:
684                 loop.save_result(error)
685                 continue
686
687             threads = []
688             for service_root, ks in roots_map.iteritems():
689                 if ks.finished():
690                     continue
691                 t = KeepClient.KeepWriterThread(
692                     ks,
693                     data=data,
694                     data_hash=data_hash,
695                     service_root=service_root,
696                     thread_limiter=thread_limiter,
697                     timeout=self.timeout)
698                 t.start()
699                 threads.append(t)
700             for t in threads:
701                 t.join()
702             loop.save_result((thread_limiter.done() >= copies, len(threads)))
703
704         if loop.success():
705             return thread_limiter.response()
706         raise arvados.errors.KeepWriteError(
707             "Write fail for %s: wanted %d but wrote %d" %
708             (data_hash, copies, thread_limiter.done()))
709
710     # Local storage methods need no-op num_retries arguments to keep
711     # integration tests happy.  With better isolation they could
712     # probably be removed again.
713     def local_store_put(self, data, num_retries=0):
714         md5 = hashlib.md5(data).hexdigest()
715         locator = '%s+%d' % (md5, len(data))
716         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
717             f.write(data)
718         os.rename(os.path.join(self.local_store, md5 + '.tmp'),
719                   os.path.join(self.local_store, md5))
720         return locator
721
722     def local_store_get(self, loc_s, num_retries=0):
723         try:
724             locator = KeepLocator(loc_s)
725         except ValueError:
726             raise arvados.errors.NotFoundError(
727                 "Invalid data locator: '%s'" % loc_s)
728         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
729             return ''
730         with open(os.path.join(self.local_store, locator.md5sum), 'r') as f:
731             return f.read()