12870: Fix test
[arvados.git] / services / fuse / tests / test_tmp_collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: AGPL-3.0
4
5 import arvados
6 import arvados_fuse
7 import arvados_fuse.command
8 import json
9 import logging
10 import os
11 import tempfile
12 import unittest
13
14 from .integration_test import IntegrationTest
15 from .mount_test_base import MountTestBase
16
17 logger = logging.getLogger('arvados.arv-mount')
18
19
20 class TmpCollectionArgsTest(unittest.TestCase):
21     def setUp(self):
22         self.tmpdir = tempfile.mkdtemp()
23
24     def tearDown(self):
25         os.rmdir(self.tmpdir)
26
27     def test_tmp_only(self):
28         args = arvados_fuse.command.ArgumentParser().parse_args([
29             '--mount-tmp', 'tmp1',
30             '--mount-tmp', 'tmp2',
31             self.tmpdir,
32         ])
33         self.assertIn(args.mode, [None, 'custom'])
34         self.assertEqual(['tmp1', 'tmp2'], args.mount_tmp)
35         for mtype in ['home', 'shared', 'by_id', 'by_pdh', 'by_tag']:
36             self.assertEqual([], getattr(args, 'mount_'+mtype))
37
38     def test_tmp_and_home(self):
39         args = arvados_fuse.command.ArgumentParser().parse_args([
40             '--mount-tmp', 'test_tmp',
41             '--mount-home', 'test_home',
42             self.tmpdir,
43         ])
44         self.assertIn(args.mode, [None, 'custom'])
45         self.assertEqual(['test_tmp'], args.mount_tmp)
46         self.assertEqual(['test_home'], args.mount_home)
47
48     def test_no_tmp(self):
49         args = arvados_fuse.command.ArgumentParser().parse_args([
50             self.tmpdir,
51         ])
52         self.assertEqual([], args.mount_tmp)
53
54
55 def current_manifest(tmpdir):
56     return json.load(open(
57         os.path.join(tmpdir, '.arvados#collection')
58     ))['manifest_text']
59
60
61 class TmpCollectionTest(IntegrationTest):
62     mnt_args = [
63         '--read-write',
64         '--mount-tmp', 'zzz',
65     ]
66
67     @IntegrationTest.mount(argv=mnt_args+['--mount-tmp', 'yyy'])
68     def test_two_tmp(self):
69         self.pool_test(os.path.join(self.mnt, 'zzz'),
70                        os.path.join(self.mnt, 'yyy'))
71     @staticmethod
72     def _test_two_tmp(self, zzz, yyy):
73         self.assertEqual(current_manifest(zzz), "")
74         self.assertEqual(current_manifest(yyy), "")
75         with open(os.path.join(zzz, 'foo'), 'w') as f:
76             f.write('foo')
77         self.assertNotEqual(current_manifest(zzz), "")
78         self.assertEqual(current_manifest(yyy), "")
79         os.unlink(os.path.join(zzz, 'foo'))
80         with open(os.path.join(yyy, 'bar'), 'w') as f:
81             f.write('bar')
82         self.assertEqual(current_manifest(zzz), "")
83         self.assertNotEqual(current_manifest(yyy), "")
84
85     @IntegrationTest.mount(argv=mnt_args)
86     def test_tmp_empty(self):
87         self.pool_test(os.path.join(self.mnt, 'zzz'))
88     @staticmethod
89     def _test_tmp_empty(self, tmpdir):
90         self.assertEqual(current_manifest(tmpdir), "")
91
92     @IntegrationTest.mount(argv=mnt_args)
93     def test_tmp_onefile(self):
94         self.pool_test(os.path.join(self.mnt, 'zzz'))
95     @staticmethod
96     def _test_tmp_onefile(self, tmpdir):
97         with open(os.path.join(tmpdir, 'foo'), 'w') as f:
98             f.write('foo')
99         self.assertRegexpMatches(
100             current_manifest(tmpdir),
101             r'^\. acbd18db4cc2f85cedef654fccc4a4d8\+3(\+\S+)? 0:3:foo\n$')
102
103     @IntegrationTest.mount(argv=mnt_args)
104     def test_tmp_snapshots(self):
105         self.pool_test(os.path.join(self.mnt, 'zzz'))
106     @staticmethod
107     def _test_tmp_snapshots(self, tmpdir):
108         ops = [
109             ('foo', 'bar',
110              r'^\. 37b51d194a7513e45b56f6524f2d51f2\+3(\+\S+)? 0:3:foo\n$'),
111             ('foo', 'foo',
112              r'^\. acbd18db4cc2f85cedef654fccc4a4d8\+3(\+\S+)? 0:3:foo\n$'),
113             ('bar', 'bar',
114              r'^\. 37b51d194a7513e45b56f6524f2d51f2\+3(\+\S+)? acbd18db4cc2f85cedef654fccc4a4d8\+3(\+\S+)? 0:3:bar 3:3:foo\n$'),
115             ('foo', None,
116              r'^\. 37b51d194a7513e45b56f6524f2d51f2\+3(\+\S+)? 0:3:bar\n$'),
117             ('bar', None,
118              r'^$'),
119         ]
120         for _ in range(10):
121             for fn, content, expect in ops:
122                 path = os.path.join(tmpdir, fn)
123                 if content is None:
124                     os.unlink(path)
125                 else:
126                     with open(path, 'w') as f:
127                         f.write(content)
128                 self.assertRegexpMatches(current_manifest(tmpdir), expect)
129
130     @IntegrationTest.mount(argv=mnt_args)
131     def test_tmp_rewrite(self):
132         self.pool_test(os.path.join(self.mnt, 'zzz'))
133     @staticmethod
134     def _test_tmp_rewrite(self, tmpdir):
135         with open(os.path.join(tmpdir, "b1"), 'w') as f:
136             f.write("b1")
137         with open(os.path.join(tmpdir, "b2"), 'w') as f:
138             f.write("b2")
139         with open(os.path.join(tmpdir, "b1"), 'w') as f:
140             f.write("1b")
141         self.assertRegexpMatches(current_manifest(tmpdir), "^\. ed4f3f67c70b02b29c50ce1ea26666bd\+4(\+\S+)? 0:2:b1 2:2:b2\n$")