#
# SPDX-License-Identifier: AGPL-3.0
+from __future__ import absolute_import
+from six import assertRegex
import arvados_fuse.command
import json
import multiprocessing
import os
-import run_test_server
+from . import run_test_server
import tempfile
import unittest
+from .integration_test import workerPool
+
try:
from shlex import quote
except:
def setUp(self):
self.mnt = tempfile.mkdtemp()
_, self.okfile = tempfile.mkstemp()
- self.pool = multiprocessing.Pool(1)
def tearDown(self):
- self.pool.terminate()
- self.pool.join()
os.rmdir(self.mnt)
os.unlink(self.okfile)
def test_exec(self):
- self.pool.apply(try_exec, (self.mnt, [
+ workerPool().apply(try_exec, (self.mnt, [
'sh', '-c',
'echo -n foo >{}; cp {} {}'.format(
quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
quote(os.path.join(self.okfile)))]))
- self.assertRegexpMatches(
- json.load(open(self.okfile))['manifest_text'],
- r' 0:3:foo.txt\n')
+ with open(self.okfile) as f:
+ assertRegex(
+ self,
+ json.load(f)['manifest_text'],
+ r' 0:3:foo.txt\n')