#!/usr/bin/env python
import arvados
+import contextlib
import errno
import hashlib
import httplib
import pycurl
import Queue
import shutil
+import sys
import tempfile
import unittest
def str_keep_locator(s):
return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+ orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+ orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+ try:
+ yield
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+
+
class FakeCurl:
@classmethod
def make(cls, code, body='', headers={}):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import multiprocessing
+import io
import os
import sys
import tempfile
import unittest
import arvados.commands.arv_copy as arv_copy
+import arvados_testutil as tutil
class ArvCopyTestCase(unittest.TestCase):
def run_copy(self, args):
sys.argv = ['arv-copy'] + args
return arv_copy.main()
- def run_copy_process(self, args):
- _, stdout_path = tempfile.mkstemp()
- _, stderr_path = tempfile.mkstemp()
- def wrap():
- def wrapper():
- sys.argv = ['arv-copy'] + args
- sys.stdout = open(stdout_path, 'w')
- sys.stderr = open(stderr_path, 'w')
- arv_copy.main()
- return wrapper
- p = multiprocessing.Process(target=wrap())
- p.start()
- p.join()
- out = open(stdout_path, 'r').read()
- err = open(stderr_path, 'r').read()
- os.unlink(stdout_path)
- os.unlink(stderr_path)
- return p.exitcode, out, err
-
def test_unsupported_arg(self):
with self.assertRaises(SystemExit):
self.run_copy(['-x=unknown'])
def test_version_argument(self):
- exitcode, out, err = self.run_copy_process(['--version'])
- self.assertEqual(0, exitcode)
- self.assertEqual('', out)
- self.assertNotEqual('', err)
- self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_copy(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import multiprocessing
+import io
import os
import sys
import tempfile
import unittest
import arvados.commands.keepdocker as arv_keepdocker
+import arvados_testutil as tutil
class ArvKeepdockerTestCase(unittest.TestCase):
sys.argv = ['arv-keepdocker'] + args
return arv_keepdocker.main()
- def run_arv_keepdocker_process(self, args):
- _, stdout_path = tempfile.mkstemp()
- _, stderr_path = tempfile.mkstemp()
- def wrap():
- def wrapper():
- sys.argv = ['arv-keepdocker'] + args
- sys.stdout = open(stdout_path, 'w')
- sys.stderr = open(stderr_path, 'w')
- arv_keepdocker.main()
- return wrapper
- p = multiprocessing.Process(target=wrap())
- p.start()
- p.join()
- out = open(stdout_path, 'r').read()
- err = open(stderr_path, 'r').read()
- os.unlink(stdout_path)
- os.unlink(stderr_path)
- return p.exitcode, out, err
-
def test_unsupported_arg(self):
with self.assertRaises(SystemExit):
self.run_arv_keepdocker(['-x=unknown'])
def test_version_argument(self):
- exitcode, out, err = self.run_arv_keepdocker_process(['--version'])
- self.assertEqual(0, exitcode)
- self.assertEqual('', out)
- self.assertNotEqual('', err)
- self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_arv_keepdocker(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
import sys
import mock
import tempfile
-import multiprocessing
import arvados.errors as arv_error
import arvados.commands.ls as arv_ls
import run_test_server
-from arvados_testutil import str_keep_locator
+from arvados_testutil import str_keep_locator, redirected_streams
class ArvLsTestCase(run_test_server.TestCaseWithServers):
FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
self.stderr = io.BytesIO()
return arv_ls.main(args, self.stdout, self.stderr, api_client)
- def run_ls_process(self, args=[], api_client=None):
- _, stdout_path = tempfile.mkstemp()
- _, stderr_path = tempfile.mkstemp()
- def wrap():
- def wrapper(*args, **kwargs):
- sys.stdout = open(stdout_path, 'w')
- sys.stderr = open(stderr_path, 'w')
- arv_ls.main(*args, **kwargs)
- return wrapper
- p = multiprocessing.Process(target=wrap(),
- args=(args, sys.stdout, sys.stderr, api_client))
- p.start()
- p.join()
- out = open(stdout_path, 'r').read()
- err = open(stderr_path, 'r').read()
- os.unlink(stdout_path)
- os.unlink(stderr_path)
- return p.exitcode, out, err
-
def test_plain_listing(self):
collection, api_client = self.mock_api_for_manifest(
['. {} 0:3:one.txt 3:4:two.txt'.format(self.random_blocks(5, 2)),
self.assertNotEqual('', self.stderr.getvalue())
def test_version_argument(self):
- _, api_client = self.mock_api_for_manifest([''])
- exitcode, out, err = self.run_ls_process(['--version'])
- self.assertEqual(0, exitcode)
- self.assertEqual('', out)
- self.assertNotEqual('', err)
- self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_ls(['--version'], None)
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
# -*- coding: utf-8 -*-
import apiclient
+import io
import mock
import os
import pwd
import threading
import hashlib
import random
-import multiprocessing
from cStringIO import StringIO
'098f6bcd4621d373cade4e832627b4f6')),
"did not find file stream in Keep store")
- def run_main_process(self, args):
- _, stdout_path = tempfile.mkstemp()
- _, stderr_path = tempfile.mkstemp()
- def wrap():
- def wrapper(*args, **kwargs):
- sys.stdout = open(stdout_path, 'w')
- sys.stderr = open(stderr_path, 'w')
- arv_put.main(*args, **kwargs)
- return wrapper
- p = multiprocessing.Process(target=wrap(), args=(args, sys.stdout, sys.stderr))
- p.start()
- p.join()
- out = open(stdout_path, 'r').read()
- err = open(stderr_path, 'r').read()
- os.unlink(stdout_path)
- os.unlink(stderr_path)
- return p.exitcode, out, err
-
def setUp(self):
super(ArvadosPutTest, self).setUp()
run_test_server.authorize_with('active')
super(ArvadosPutTest, self).tearDown()
def test_version_argument(self):
- exitcode, out, err = self.run_main_process(['--version'])
- self.assertEqual(0, exitcode)
- self.assertEqual('', out)
- self.assertNotEqual('', err)
- self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.call_main_with_args(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
def test_simple_file_put(self):
self.call_main_on_test_file()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
-import multiprocessing
+import io
import os
import sys
import tempfile
import unittest
import arvados.commands.run as arv_run
+import arvados_testutil as tutil
class ArvRunTestCase(unittest.TestCase):
def run_arv_run(self, args):
sys.argv = ['arv-run'] + args
return arv_run.main()
- def run_arv_run_process(self, args):
- _, stdout_path = tempfile.mkstemp()
- _, stderr_path = tempfile.mkstemp()
- def wrap():
- def wrapper():
- sys.argv = ['arv-run'] + args
- sys.stdout = open(stdout_path, 'w')
- sys.stderr = open(stderr_path, 'w')
- arv_run.main()
- return wrapper
- p = multiprocessing.Process(target=wrap())
- p.start()
- p.join()
- out = open(stdout_path, 'r').read()
- err = open(stderr_path, 'r').read()
- os.unlink(stdout_path)
- os.unlink(stderr_path)
- return p.exitcode, out, err
-
def test_unsupported_arg(self):
with self.assertRaises(SystemExit):
self.run_arv_run(['-x=unknown'])
def test_version_argument(self):
- exitcode, out, err = self.run_arv_run_process(['--version'])
- self.assertEqual(0, exitcode)
- self.assertEqual('', out)
- self.assertNotEqual('', err)
- self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_arv_run(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
#!/usr/bin/env python
-import multiprocessing
+import io
import os
import sys
import tempfile
import arvados.errors as arv_error
import arvados.commands.ws as arv_ws
+import arvados_testutil as tutil
class ArvWsTestCase(unittest.TestCase):
def run_ws(self, args):
return arv_ws.main(args)
- def run_ws_process(self, args=[], api_client=None):
- _, stdout_path = tempfile.mkstemp()
- _, stderr_path = tempfile.mkstemp()
- def wrap():
- def wrapper(*args, **kwargs):
- sys.stdout = open(stdout_path, 'w')
- sys.stderr = open(stderr_path, 'w')
- arv_ws.main(*args, **kwargs)
- return wrapper
- p = multiprocessing.Process(target=wrap(), args=(args,))
- p.start()
- p.join()
- out = open(stdout_path, 'r').read()
- err = open(stderr_path, 'r').read()
- os.unlink(stdout_path)
- os.unlink(stderr_path)
- return p.exitcode, out, err
-
def test_unsupported_arg(self):
with self.assertRaises(SystemExit):
self.run_ws(['-x=unknown'])
def test_version_argument(self):
- exitcode, out, err = self.run_ws_process(['--version'])
- self.assertEqual(0, exitcode)
- self.assertEqual('', out)
- self.assertNotEqual('', err)
- self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_ws(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
#!/usr/bin/env python
-import multiprocessing
+import io
import os
import sys
import tempfile
import unittest
import arvnodeman.launcher as nodeman
+from . import testutil
class ArvNodemArgumentsTestCase(unittest.TestCase):
def run_nodeman(self, args):
return nodeman.main(args)
- def run_nodeman_process(self, args=[]):
- _, stdout_path = tempfile.mkstemp()
- _, stderr_path = tempfile.mkstemp()
- def wrap():
- def wrapper(*args, **kwargs):
- sys.stdout = open(stdout_path, 'w')
- sys.stderr = open(stderr_path, 'w')
- nodeman.main(*args, **kwargs)
- return wrapper
- p = multiprocessing.Process(target=wrap(), args=(args,))
- p.start()
- p.join()
- out = open(stdout_path, 'r').read()
- err = open(stderr_path, 'r').read()
- os.unlink(stdout_path)
- os.unlink(stderr_path)
- return p.exitcode, out, err
-
def test_unsupported_arg(self):
with self.assertRaises(SystemExit):
self.run_nodeman(['-x=unknown'])
def test_version_argument(self):
- exitcode, out, err = self.run_nodeman_process(['--version'])
- self.assertEqual(0, exitcode)
- self.assertEqual('', out)
- self.assertNotEqual('', err)
- self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with testutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_nodeman(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
from __future__ import absolute_import, print_function
+import contextlib
import datetime
+import mock
+import pykka
+import sys
import threading
import time
import libcloud.common.types as cloud_types
-import mock
-import pykka
from . import pykka_timeout
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+ orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+ orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+ try:
+ yield
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+
+
class MockShutdownTimer(object):
def _set_state(self, is_open, next_opening):
self.window_open = lambda: is_open