def _get_user_agent(self):
try:
- return self._user_agent_pool.get(False)
+ return self._user_agent_pool.get(block=False)
except Queue.Empty:
return pycurl.Curl()
def _put_user_agent(self, ua):
try:
ua.reset()
- self._user_agent_pool.put(ua, False)
+ self._user_agent_pool.put(ua, block=False)
except:
ua.close()
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
+ with self.pending_tries_notification:
+ self.pending_tries_notification.notify_all()
- def write_fail(self, ks, status_code):
+ def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
-
-
+
+ def get_next_task(self):
+ with self.pending_tries_notification:
+ while True:
+ if self.pending_copies() < 1:
+ # This notify_all() is unnecessary --
+ # write_success() already called notify_all()
+ # when pending<1 became true, so it's not
+ # possible for any other thread to be in
+ # wait() now -- but it's cheap insurance
+ # against deadlock so we do it anyway:
+ self.pending_tries_notification.notify_all()
+ # Drain the queue and then raise Queue.Empty
+ while True:
+ self.get_nowait()
+ self.task_done()
+ elif self.pending_tries > 0:
+ service, service_root = self.get_nowait()
+ if service.finished():
+ self.task_done()
+ continue
+ self.pending_tries -= 1
+ return service, service_root
+ elif self.empty():
+ self.pending_tries_notification.notify_all()
+ raise Queue.Empty
+ else:
+ self.pending_tries_notification.wait()
+
+
class KeepWriterThreadPool(object):
def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
self.total_task_nr = 0
worker.start()
# Wait for finished work
self.queue.join()
- with self.queue.pending_tries_notification:
- self.queue.pending_tries_notification.notify_all()
- for worker in self.workers:
- worker.join()
def response(self):
return self.queue.response
class KeepWriterThread(threading.Thread):
+ TaskFailed = RuntimeError()
+
def __init__(self, queue, data, data_hash, timeout=None):
super(KeepClient.KeepWriterThread, self).__init__()
self.timeout = timeout
self.queue = queue
self.data = data
self.data_hash = data_hash
-
+ self.daemon = True
+
def run(self):
- while not self.queue.empty():
- if self.queue.pending_copies() > 0:
- # Avoid overreplication, wait for some needed re-attempt
- with self.queue.pending_tries_notification:
- if self.queue.pending_tries <= 0:
- self.queue.pending_tries_notification.wait()
- continue # try again when awake
- self.queue.pending_tries -= 1
-
- # Get to work
- try:
- service, service_root = self.queue.get_nowait()
- except Queue.Empty:
- continue
- if service.finished():
- self.queue.task_done()
- continue
- success = bool(service.put(self.data_hash,
- self.data,
- timeout=self.timeout))
- result = service.last_result()
- if success:
- _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
- str(threading.current_thread()),
- self.data_hash,
- len(self.data),
- service_root)
- try:
- replicas_stored = int(result['headers']['x-keep-replicas-stored'])
- except (KeyError, ValueError):
- replicas_stored = 1
-
- self.queue.write_success(result['body'].strip(), replicas_stored)
- else:
- if result.get('status_code', None):
- _logger.debug("Request fail: PUT %s => %s %s",
- self.data_hash,
- result['status_code'],
- result['body'])
- self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
- # Mark as done so the queue can be join()ed
- self.queue.task_done()
+ while True:
+ try:
+ service, service_root = self.queue.get_next_task()
+ except Queue.Empty:
+ return
+ try:
+ locator, copies = self.do_task(service, service_root)
+ except Exception as e:
+ if e is not self.TaskFailed:
+ _logger.exception("Exception in KeepWriterThread")
+ self.queue.write_fail(service)
else:
- # Remove the task from the queue anyways
- try:
- self.queue.get_nowait()
- # Mark as done so the queue can be join()ed
- self.queue.task_done()
- except Queue.Empty:
- continue
+ self.queue.write_success(locator, copies)
+ finally:
+ self.queue.task_done()
+
+ def do_task(self, service, service_root):
+ success = bool(service.put(self.data_hash,
+ self.data,
+ timeout=self.timeout))
+ result = service.last_result()
+
+ if not success:
+ if result.get('status_code', None):
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.data_hash,
+ result['status_code'],
+ result['body'])
+ raise self.TaskFailed
+
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+ str(threading.current_thread()),
+ self.data_hash,
+ len(self.data),
+ service_root)
+ try:
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
+ replicas_stored = 1
+
+ return result['body'].strip(), replicas_stored
def __init__(self, api_client=None, proxy=None,