import threading
from . import timer
import urllib.parse
-import errno
if sys.version_info >= (3, 0):
from io import BytesIO
self._lastheadername = name
self._headers[name] = value
# Returning None implies all bytes were written
-
+
class KeepWriterQueue(queue.Queue):
def __init__(self, copies):
self.successful_copies_lock = threading.Lock()
self.pending_tries = copies
self.pending_tries_notification = threading.Condition()
-
+
def write_success(self, response, replicas_nr):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
with self.pending_tries_notification:
self.pending_tries_notification.notify_all()
-
+
def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
-
+
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
for _ in range(num_threads):
w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
self.workers.append(w)
-
+
def add_task(self, ks, service_root):
self.queue.put((ks, service_root))
self.total_task_nr += 1
-
+
def done(self):
return self.queue.successful_copies
-
+
def join(self):
# Start workers
for worker in self.workers:
worker.start()
# Wait for finished work
self.queue.join()
-
+
def response(self):
return self.queue.response
-
-
+
+
class KeepWriterThread(threading.Thread):
TaskFailed = RuntimeError()
loop.save_result(error)
continue
- writer_pool = KeepClient.KeepWriterThreadPool(data=data,
+ writer_pool = KeepClient.KeepWriterThreadPool(data=data,
data_hash=data_hash,
copies=copies - done,
max_service_replicas=self.max_replicas_per_service,
"Invalid data locator: '%s'" % loc_s)
if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
return b''
-
- try:
- with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
- return f.read()
- except IOError as e:
- if e.errno == errno.ENOENT:
- raise arvados.errors.NotFoundError("Not found '%s'" % locator.md5sum)
- else:
- raise
-
+ with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
+ return f.read()
def is_cached(self, locator):
return self.block_cache.reserve_cache(expect_hash)