1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: AGPL-3.0
5 import arvados_fuse.command
13 from . import run_test_server
14 from .integration_test import workerPool
16 def try_exec(mnt, cmd):
18 os.environ['KEEP_LOCAL_STORE'] = tempfile.mkdtemp()
19 arvados_fuse.command.Mount(
20 arvados_fuse.command.ArgumentParser().parse_args([
23 '--unmount-timeout=0.1',
25 '--exec'] + cmd)).run()
29 raise AssertionError('should have exited')
32 class ExecMode(unittest.TestCase):
36 run_test_server.run_keep(blob_signing=True, num_servers=2)
37 run_test_server.authorize_with('active')
40 def tearDownClass(cls):
41 run_test_server.stop_keep(num_servers=2)
44 self.mnt = tempfile.mkdtemp()
45 _, self.okfile = tempfile.mkstemp()
49 os.unlink(self.okfile)
52 workerPool().apply(try_exec, (self.mnt, [
53 'sh', '-c', 'echo -n foo >{}; cp {} {}'.format(
54 shlex.quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
55 shlex.quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
56 shlex.quote(os.path.join(self.okfile)),
58 with open(self.okfile) as f:
59 self.assertRegex(json.load(f)['manifest_text'], r' 0:3:foo.txt\n')