+ def _setcurltimeouts(self, curl, timeouts):
+ if not timeouts:
+ return
+ elif isinstance(timeouts, tuple):
+ if len(timeouts) == 2:
+ conn_t, xfer_t = timeouts
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ else:
+ conn_t, xfer_t, bandwidth_bps = timeouts
+ else:
+ conn_t, xfer_t = (timeouts, timeouts)
+ bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
+ curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+
+ def _headerfunction(self, header_line):
+ header_line = header_line.decode('iso-8859-1')
+ if ':' in header_line:
+ name, value = header_line.split(':', 1)
+ name = name.strip().lower()
+ value = value.strip()
+ elif self._headers:
+ name = self._lastheadername
+ value = self._headers[name] + ' ' + header_line.strip()
+ elif header_line.startswith('HTTP/'):
+ name = 'x-status-line'
+ value = header_line
+ else:
+ _logger.error("Unexpected header line: %s", header_line)
+ return
+ self._lastheadername = name
+ self._headers[name] = value
+ # Returning None implies all bytes were written
+
+
+ class KeepWriterQueue(Queue.Queue):
+ def __init__(self, copies):
+ Queue.Queue.__init__(self) # Old-style superclass
+ self.wanted_copies = copies
+ self.successful_copies = 0
+ self.response = None
+ self.successful_copies_lock = threading.Lock()
+ self.pending_tries = copies
+ self.pending_tries_notification = threading.Condition()
+
+ def write_success(self, response, replicas_nr):
+ with self.successful_copies_lock:
+ self.successful_copies += replicas_nr
+ self.response = response
+ with self.pending_tries_notification:
+ self.pending_tries_notification.notify_all()
+
+ def write_fail(self, ks):
+ with self.pending_tries_notification:
+ self.pending_tries += 1
+ self.pending_tries_notification.notify()
+
+ def pending_copies(self):
+ with self.successful_copies_lock:
+ return self.wanted_copies - self.successful_copies
+
+ def get_next_task(self):
+ with self.pending_tries_notification:
+ while True:
+ if self.pending_copies() < 1:
+ # This notify_all() is unnecessary --
+ # write_success() already called notify_all()
+ # when pending<1 became true, so it's not
+ # possible for any other thread to be in
+ # wait() now -- but it's cheap insurance
+ # against deadlock so we do it anyway:
+ self.pending_tries_notification.notify_all()
+ # Drain the queue and then raise Queue.Empty
+ while True:
+ self.get_nowait()
+ self.task_done()
+ elif self.pending_tries > 0:
+ service, service_root = self.get_nowait()
+ if service.finished():
+ self.task_done()
+ continue
+ self.pending_tries -= 1
+ return service, service_root
+ elif self.empty():
+ self.pending_tries_notification.notify_all()
+ raise Queue.Empty
+ else:
+ self.pending_tries_notification.wait()
+
+
+ class KeepWriterThreadPool(object):
+ def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
+ self.total_task_nr = 0
+ self.wanted_copies = copies
+ if (not max_service_replicas) or (max_service_replicas >= copies):
+ num_threads = 1
+ else:
+ num_threads = int(math.ceil(float(copies) / max_service_replicas))
+ _logger.debug("Pool max threads is %d", num_threads)
+ self.workers = []
+ self.queue = KeepClient.KeepWriterQueue(copies)
+ # Create workers
+ for _ in range(num_threads):
+ w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
+ self.workers.append(w)
+
+ def add_task(self, ks, service_root):
+ self.queue.put((ks, service_root))
+ self.total_task_nr += 1
+
+ def done(self):
+ return self.queue.successful_copies
+
+ def join(self):
+ # Start workers
+ for worker in self.workers:
+ worker.start()
+ # Wait for finished work
+ self.queue.join()
+
+ def response(self):
+ return self.queue.response
+
+