Merge branch 'master' into 14539-pysdk-empty-dir
[arvados.git] / services / fuse / tests / test_exec.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados_fuse.command
6 import json
7 import multiprocessing
8 import os
9 import run_test_server
10 import tempfile
11 import unittest
12
13 from .integration_test import workerPool
14
15 try:
16     from shlex import quote
17 except:
18     from pipes import quote
19
20 def try_exec(mnt, cmd):
21     try:
22         arvados_fuse.command.Mount(
23             arvados_fuse.command.ArgumentParser().parse_args([
24                 '--read-write',
25                 '--mount-tmp=zzz',
26                 '--unmount-timeout=0.1',
27                 mnt,
28                 '--exec'] + cmd)).run()
29     except SystemExit:
30         pass
31     else:
32         raise AssertionError('should have exited')
33
34
35 class ExecMode(unittest.TestCase):
36     @classmethod
37     def setUpClass(cls):
38         run_test_server.run()
39         run_test_server.run_keep(enforce_permissions=True, num_servers=2)
40         run_test_server.authorize_with('active')
41
42     @classmethod
43     def tearDownClass(cls):
44         run_test_server.stop_keep(num_servers=2)
45
46     def setUp(self):
47         self.mnt = tempfile.mkdtemp()
48         _, self.okfile = tempfile.mkstemp()
49
50     def tearDown(self):
51         os.rmdir(self.mnt)
52         os.unlink(self.okfile)
53
54     def test_exec(self):
55         workerPool().apply(try_exec, (self.mnt, [
56             'sh', '-c',
57             'echo -n foo >{}; cp {} {}'.format(
58                 quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
59                 quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
60                 quote(os.path.join(self.okfile)))]))
61         self.assertRegexpMatches(
62             json.load(open(self.okfile))['manifest_text'],
63             r' 0:3:foo.txt\n')