X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/86f04235021d84afa0d28d105111422e0dd15738..c8aa6553fd4af8be3ca9b1d9d9d660750cd59d1b:/sdk/python/arvados/keep.py diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index f318ea7a75..38f332b38e 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -296,14 +296,14 @@ class KeepClient(object): def _get_user_agent(self): try: - return self._user_agent_pool.get(False) + return self._user_agent_pool.get(block=False) except Queue.Empty: return pycurl.Curl() def _put_user_agent(self, ua): try: ua.reset() - self._user_agent_pool.put(ua, False) + self._user_agent_pool.put(ua, block=False) except: ua.close() @@ -527,13 +527,24 @@ class KeepClient(object): with self.pending_tries_notification: while True: if self.pending_copies() < 1: + # This notify_all() is unnecessary -- + # write_success() already called notify_all() + # when pending<1 became true, so it's not + # possible for any other thread to be in + # wait() now -- but it's cheap insurance + # against deadlock so we do it anyway: + self.pending_tries_notification.notify_all() # Drain the queue and then raise Queue.Empty while True: self.get_nowait() self.task_done() elif self.pending_tries > 0: + service, service_root = self.get_nowait() + if service.finished(): + self.task_done() + continue self.pending_tries -= 1 - return self.get_nowait() + return service, service_root elif self.empty(): self.pending_tries_notification.notify_all() raise Queue.Empty @@ -595,7 +606,7 @@ class KeepClient(object): try: locator, copies = self.do_task(service, service_root) except Exception as e: - if e != self.TaskFailed: + if e is not self.TaskFailed: _logger.exception("Exception in KeepWriterThread") self.queue.write_fail(service) else: @@ -604,8 +615,6 @@ class KeepClient(object): self.queue.task_done() def do_task(self, service, service_root): - if service.finished(): - return success = bool(service.put(self.data_hash, self.data, timeout=self.timeout))