Merge branch '10813-arv-put-six-threads'
[arvados.git] / sdk / python / arvados / keep.py
index f318ea7a75aa9621fa7f8f692a0d9f6e94df4647..38f332b38e2d51aae9b6a3fa5007a59aad6b006f 100644 (file)
@@ -296,14 +296,14 @@ class KeepClient(object):
 
         def _get_user_agent(self):
             try:
-                return self._user_agent_pool.get(False)
+                return self._user_agent_pool.get(block=False)
             except Queue.Empty:
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
             try:
                 ua.reset()
-                self._user_agent_pool.put(ua, False)
+                self._user_agent_pool.put(ua, block=False)
             except:
                 ua.close()
 
@@ -527,13 +527,24 @@ class KeepClient(object):
             with self.pending_tries_notification:
                 while True:
                     if self.pending_copies() < 1:
+                        # This notify_all() is unnecessary --
+                        # write_success() already called notify_all()
+                        # when pending<1 became true, so it's not
+                        # possible for any other thread to be in
+                        # wait() now -- but it's cheap insurance
+                        # against deadlock so we do it anyway:
+                        self.pending_tries_notification.notify_all()
                         # Drain the queue and then raise Queue.Empty
                         while True:
                             self.get_nowait()
                             self.task_done()
                     elif self.pending_tries > 0:
+                        service, service_root = self.get_nowait()
+                        if service.finished():
+                            self.task_done()
+                            continue
                         self.pending_tries -= 1
-                        return self.get_nowait()
+                        return service, service_root
                     elif self.empty():
                         self.pending_tries_notification.notify_all()
                         raise Queue.Empty
@@ -595,7 +606,7 @@ class KeepClient(object):
                 try:
                     locator, copies = self.do_task(service, service_root)
                 except Exception as e:
-                    if e != self.TaskFailed:
+                    if e is not self.TaskFailed:
                         _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
@@ -604,8 +615,6 @@ class KeepClient(object):
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
-            if service.finished():
-                return
             success = bool(service.put(self.data_hash,
                                         self.data,
                                         timeout=self.timeout))