def _get_user_agent(self):
try:
- return self._user_agent_pool.get(False)
+ return self._user_agent_pool.get(block=False)
except Queue.Empty:
return pycurl.Curl()
def _put_user_agent(self, ua):
try:
ua.reset()
- self._user_agent_pool.put(ua, False)
+ self._user_agent_pool.put(ua, block=False)
except:
ua.close()
with self.pending_tries_notification:
while True:
if self.pending_copies() < 1:
+ # This notify_all() is unnecessary --
+ # write_success() already called notify_all()
+ # when pending<1 became true, so it's not
+ # possible for any other thread to be in
+ # wait() now -- but it's cheap insurance
+ # against deadlock so we do it anyway:
+ self.pending_tries_notification.notify_all()
# Drain the queue and then raise Queue.Empty
while True:
self.get_nowait()
self.task_done()
elif self.pending_tries > 0:
+ service, service_root = self.get_nowait()
+ if service.finished():
+ self.task_done()
+ continue
self.pending_tries -= 1
- return self.get_nowait()
+ return service, service_root
elif self.empty():
self.pending_tries_notification.notify_all()
raise Queue.Empty
try:
locator, copies = self.do_task(service, service_root)
except Exception as e:
- if e != self.TaskFailed:
+ if e is not self.TaskFailed:
_logger.exception("Exception in KeepWriterThread")
self.queue.write_fail(service)
else:
self.queue.task_done()
def do_task(self, service, service_root):
- if service.finished():
- return
success = bool(service.put(self.data_hash,
self.data,
timeout=self.timeout))