from __future__ import absolute_import, print_function
+import contextlib
import datetime
+import mock
+import pykka
+import sys
import threading
import time
import libcloud.common.types as cloud_types
-import mock
-import pykka
from . import pykka_timeout
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+ orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+ orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+ try:
+ yield
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+
+
class MockShutdownTimer(object):
def _set_state(self, is_open, next_opening):
self.window_open = lambda: is_open