12033: Add config to link multi-site search to a remote site.
[arvados.git] / sdk / python / arvados / keep.py
index 4103b308f186aca376781932c766d1c7fb6da35d..e6e93f080659abf9a51cec9d4425cafe8bdc976b 100644 (file)
@@ -27,7 +27,6 @@ import sys
 import threading
 from . import timer
 import urllib.parse
-import errno
 
 if sys.version_info >= (3, 0):
     from io import BytesIO
@@ -541,7 +540,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-
+    
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -552,19 +551,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-
+        
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-
+        
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-
+        
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -613,25 +612,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-
+        
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-
+        
         def done(self):
             return self.queue.successful_copies
-
+        
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-
+        
         def response(self):
             return self.queue.response
-
-
+    
+    
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -1131,7 +1130,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
@@ -1188,16 +1187,8 @@ class KeepClient(object):
                 "Invalid data locator: '%s'" % loc_s)
         if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
             return b''
-
-        try:
-            with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
-                return f.read()
-        except IOError as e:
-            if e.errno == errno.ENOENT:
-                raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
-            else:
-                raise
-
+        with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+            return f.read()
 
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)