- while not self.queue.empty():
- if self.queue.pending_copies() > 0:
- # Avoid overreplication, wait for some needed retry
- with self.queue.retries_notification:
- if not self.queue.retries > 0:
- self.queue.retries_notification.wait()
- continue # try again when awake
- self.queue.retries -= 1
-
- # Get to work
- try:
- service, service_root = self.queue.get_nowait()
- except Queue.Empty:
- continue
- if service.finished():
- self.queue.task_done()
- continue
- success = bool(service.put(self.data_hash,
- self.data,
- timeout=self.timeout))
- result = service.last_result()
- if success:
- _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
- str(threading.current_thread()),
- self.data_hash,
- len(self.data),
- service_root)
- try:
- replicas_stored = int(result['headers']['x-keep-replicas-stored'])
- except (KeyError, ValueError):
- replicas_stored = 1
-
- self.queue.write_success(result['body'].strip(), replicas_stored)
- else:
- if result.get('status_code', None):
- _logger.debug("Request fail: PUT %s => %s %s",
- self.data_hash,
- result['status_code'],
- result['body'])
- self.queue.write_fail(service, result.get('status_code', None)) # Schedule a retry
+ while True:
+ try:
+ service, service_root = self.queue.get_next_task()
+ except Queue.Empty:
+ return
+ try:
+ locator, copies = self.do_task(service, service_root)
+ except Exception as e:
+ if e is not self.TaskFailed:
+ _logger.exception("Exception in KeepWriterThread")
+ self.queue.write_fail(service)