X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bffd1e4ee8532992b3790e4f232804a6731a9685..d047c1cb9ceecb6e324adf102e5e38e11fe698e1:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 4103b308f1..e6e93f0806 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -27,7 +27,6 @@ import sys import threading from . import timer import urllib.parse -import errno if sys.version_info >= (3, 0): from io import BytesIO @@ -541,7 +540,7 @@ class KeepClient(object): self._lastheadername = name self._headers[name] = value # Returning None implies all bytes were written - + class KeepWriterQueue(queue.Queue): def __init__(self, copies): @@ -552,19 +551,19 @@ class KeepClient(object): self.successful_copies_lock = threading.Lock() self.pending_tries = copies self.pending_tries_notification = threading.Condition() - + def write_success(self, response, replicas_nr): with self.successful_copies_lock: self.successful_copies += replicas_nr self.response = response with self.pending_tries_notification: self.pending_tries_notification.notify_all() - + def write_fail(self, ks): with self.pending_tries_notification: self.pending_tries += 1 self.pending_tries_notification.notify() - + def pending_copies(self): with self.successful_copies_lock: return self.wanted_copies - self.successful_copies @@ -613,25 +612,25 @@ class KeepClient(object): for _ in range(num_threads): w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout) self.workers.append(w) - + def add_task(self, ks, service_root): self.queue.put((ks, service_root)) self.total_task_nr += 1 - + def done(self): return self.queue.successful_copies - + def join(self): # Start workers for worker in self.workers: worker.start() # Wait for finished work self.queue.join() - + def response(self): return self.queue.response - - + + class KeepWriterThread(threading.Thread): TaskFailed = RuntimeError() @@ -1131,7 +1130,7 @@ class KeepClient(object): loop.save_result(error) continue - writer_pool = KeepClient.KeepWriterThreadPool(data=data, + writer_pool = KeepClient.KeepWriterThreadPool(data=data, data_hash=data_hash, copies=copies - done, max_service_replicas=self.max_replicas_per_service, @@ -1188,16 +1187,8 @@ class KeepClient(object): "Invalid data locator: '%s'" % loc_s) if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: return b'' - - try: - with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: - return f.read() - except IOError as e: - if e.errno == errno.ENOENT: - raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum) - else: - raise - + with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: + return f.read() def is_cached(self, locator): return self.block_cache.reserve_cache(expect_hash)