1 import arvados_fuse.command
10 from shlex import quote
12 from pipes import quote
14 def try_exec(mnt, cmd):
16 arvados_fuse.command.Mount(
17 arvados_fuse.command.ArgumentParser().parse_args([
20 '--unmount-timeout=0.1',
22 '--exec'] + cmd)).run()
26 raise AssertionError('should have exited')
29 class ExecMode(unittest.TestCase):
33 run_test_server.run_keep(enforce_permissions=True, num_servers=2)
34 run_test_server.authorize_with('active')
37 def tearDownClass(cls):
38 run_test_server.stop_keep(num_servers=2)
41 self.mnt = tempfile.mkdtemp()
42 _, self.okfile = tempfile.mkstemp()
43 self.pool = multiprocessing.Pool(1)
49 os.unlink(self.okfile)
52 self.pool.apply(try_exec, (self.mnt, [
54 'echo -n foo >{}; cp {} {}'.format(
55 quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
56 quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
57 quote(os.path.join(self.okfile)))]))
58 self.assertRegexpMatches(
59 json.load(open(self.okfile))['manifest_text'],