11308: Futurize.
[arvados.git] / sdk / python / arvados / keep.py
index 776c9b2a7d7159f6fbd8c99b20db55b2047aefb0..4efa6982a1fa346e3ae6c55a0e72f892968173f1 100644 (file)
@@ -1,3 +1,4 @@
+from __future__ import absolute_import
 import cStringIO
 import datetime
 import hashlib
@@ -11,7 +12,7 @@ import socket
 import ssl
 import sys
 import threading
-import timer
+from . import timer
 import urlparse
 
 import arvados
@@ -296,14 +297,14 @@ class KeepClient(object):
 
         def _get_user_agent(self):
             try:
-                return self._user_agent_pool.get(False)
+                return self._user_agent_pool.get(block=False)
             except Queue.Empty:
                 return pycurl.Curl()
 
         def _put_user_agent(self, ua):
             try:
                 ua.reset()
-                self._user_agent_pool.put(ua, False)
+                self._user_agent_pool.put(ua, block=False)
             except:
                 ua.close()
 
@@ -527,13 +528,24 @@ class KeepClient(object):
             with self.pending_tries_notification:
                 while True:
                     if self.pending_copies() < 1:
+                        # This notify_all() is unnecessary --
+                        # write_success() already called notify_all()
+                        # when pending<1 became true, so it's not
+                        # possible for any other thread to be in
+                        # wait() now -- but it's cheap insurance
+                        # against deadlock so we do it anyway:
+                        self.pending_tries_notification.notify_all()
                         # Drain the queue and then raise Queue.Empty
                         while True:
                             self.get_nowait()
                             self.task_done()
                     elif self.pending_tries > 0:
+                        service, service_root = self.get_nowait()
+                        if service.finished():
+                            self.task_done()
+                            continue
                         self.pending_tries -= 1
-                        return self.get_nowait()
+                        return service, service_root
                     elif self.empty():
                         self.pending_tries_notification.notify_all()
                         raise Queue.Empty
@@ -576,6 +588,8 @@ class KeepClient(object):
     
     
     class KeepWriterThread(threading.Thread):
+        TaskFailed = RuntimeError()
+
         def __init__(self, queue, data, data_hash, timeout=None):
             super(KeepClient.KeepWriterThread, self).__init__()
             self.timeout = timeout
@@ -593,7 +607,8 @@ class KeepClient(object):
                 try:
                     locator, copies = self.do_task(service, service_root)
                 except Exception as e:
-                    _logger.exception("Exception in KeepWriterThread")
+                    if e is not self.TaskFailed:
+                        _logger.exception("Exception in KeepWriterThread")
                     self.queue.write_fail(service)
                 else:
                     self.queue.write_success(locator, copies)
@@ -601,8 +616,6 @@ class KeepClient(object):
                     self.queue.task_done()
 
         def do_task(self, service, service_root):
-            if service.finished():
-                return
             success = bool(service.put(self.data_hash,
                                         self.data,
                                         timeout=self.timeout))
@@ -614,7 +627,7 @@ class KeepClient(object):
                                   self.data_hash,
                                   result['status_code'],
                                   result['body'])
-                raise RuntimeError()
+                raise self.TaskFailed
 
             _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
                           str(threading.current_thread()),