2752: Resumed collection writer doesn't do_queued_work immediately.
[arvados.git] / sdk / python / tests / test_arv-put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import os
5 import re
6 import shutil
7 import subprocess
8 import sys
9 import tempfile
10 import time
11 import unittest
12
13 import arvados
14 import arvados.commands.put as arv_put
15 from arvados_testutil import ArvadosBaseTestCase, ArvadosKeepLocalStoreTestCase
16
17 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
18     CACHE_ARGSET = [
19         [],
20         ['/dev/null'],
21         ['/dev/null', '--filename', 'empty'],
22         ['/tmp'],
23         ['/tmp', '--max-manifest-depth', '0'],
24         ['/tmp', '--max-manifest-depth', '1']
25         ]
26
27     def tearDown(self):
28         super(ArvadosPutResumeCacheTest, self).tearDown()
29         try:
30             self.last_cache.destroy()
31         except AttributeError:
32             pass
33
34     def cache_path_from_arglist(self, arglist):
35         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
36
37     def test_cache_names_stable(self):
38         for argset in self.CACHE_ARGSET:
39             self.assertEquals(self.cache_path_from_arglist(argset),
40                               self.cache_path_from_arglist(argset),
41                               "cache name changed for {}".format(argset))
42
43     def test_cache_names_unique(self):
44         results = []
45         for argset in self.CACHE_ARGSET:
46             path = self.cache_path_from_arglist(argset)
47             self.assertNotIn(path, results)
48             results.append(path)
49
50     def test_cache_names_simple(self):
51         # The goal here is to make sure the filename doesn't use characters
52         # reserved by the filesystem.  Feel free to adjust this regexp as
53         # long as it still does that.
54         bad_chars = re.compile(r'[^-\.\w]')
55         for argset in self.CACHE_ARGSET:
56             path = self.cache_path_from_arglist(argset)
57             self.assertFalse(bad_chars.search(os.path.basename(path)),
58                              "path too exotic: {}".format(path))
59
60     def test_cache_names_ignore_argument_order(self):
61         self.assertEquals(
62             self.cache_path_from_arglist(['a', 'b', 'c']),
63             self.cache_path_from_arglist(['c', 'a', 'b']))
64         self.assertEquals(
65             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
66             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
67
68     def test_cache_names_ignore_irrelevant_arguments(self):
69         # Workaround: parse_arguments bails on --filename with a directory.
70         path1 = self.cache_path_from_arglist(['/tmp'])
71         args = arv_put.parse_arguments(['/tmp'])
72         args.filename = 'tmp'
73         path2 = arv_put.ResumeCache.make_path(args)
74         self.assertEquals(path1, path2,
75                          "cache path considered --filename for directory")
76         self.assertEquals(
77             self.cache_path_from_arglist(['-']),
78             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
79             "cache path considered --max-manifest-depth for file")
80
81     def test_cache_names_treat_negative_manifest_depths_identically(self):
82         base_args = ['/tmp', '--max-manifest-depth']
83         self.assertEquals(
84             self.cache_path_from_arglist(base_args + ['-1']),
85             self.cache_path_from_arglist(base_args + ['-2']))
86
87     def test_cache_names_treat_stdin_consistently(self):
88         self.assertEquals(
89             self.cache_path_from_arglist(['-', '--filename', 'test']),
90             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
91
92     def test_cache_names_identical_for_synonymous_names(self):
93         self.assertEquals(
94             self.cache_path_from_arglist(['.']),
95             self.cache_path_from_arglist([os.path.realpath('.')]))
96         testdir = self.make_tmpdir()
97         looplink = os.path.join(testdir, 'loop')
98         os.symlink(testdir, looplink)
99         self.assertEquals(
100             self.cache_path_from_arglist([testdir]),
101             self.cache_path_from_arglist([looplink]))
102
103     def test_cache_names_different_by_api_host(self):
104         config = arvados.config.settings()
105         orig_host = config.get('ARVADOS_API_HOST')
106         try:
107             name1 = self.cache_path_from_arglist(['.'])
108             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
109             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
110         finally:
111             if orig_host is None:
112                 del config['ARVADOS_API_HOST']
113             else:
114                 config['ARVADOS_API_HOST'] = orig_host
115
116     def test_basic_cache_storage(self):
117         thing = ['test', 'list']
118         with tempfile.NamedTemporaryFile() as cachefile:
119             self.last_cache = arv_put.ResumeCache(cachefile.name)
120         self.last_cache.save(thing)
121         self.assertEquals(thing, self.last_cache.load())
122
123     def test_empty_cache(self):
124         with tempfile.NamedTemporaryFile() as cachefile:
125             cache = arv_put.ResumeCache(cachefile.name)
126         self.assertRaises(ValueError, cache.load)
127
128     def test_cache_persistent(self):
129         thing = ['test', 'list']
130         path = os.path.join(self.make_tmpdir(), 'cache')
131         cache = arv_put.ResumeCache(path)
132         cache.save(thing)
133         cache.close()
134         self.last_cache = arv_put.ResumeCache(path)
135         self.assertEquals(thing, self.last_cache.load())
136
137     def test_multiple_cache_writes(self):
138         thing = ['short', 'list']
139         with tempfile.NamedTemporaryFile() as cachefile:
140             self.last_cache = arv_put.ResumeCache(cachefile.name)
141         # Start writing an object longer than the one we test, to make
142         # sure the cache file gets truncated.
143         self.last_cache.save(['long', 'long', 'list'])
144         self.last_cache.save(thing)
145         self.assertEquals(thing, self.last_cache.load())
146
147     def test_cache_is_locked(self):
148         with tempfile.NamedTemporaryFile() as cachefile:
149             cache = arv_put.ResumeCache(cachefile.name)
150             self.assertRaises(arv_put.ResumeCacheConflict,
151                               arv_put.ResumeCache, cachefile.name)
152
153     def test_cache_stays_locked(self):
154         with tempfile.NamedTemporaryFile() as cachefile:
155             self.last_cache = arv_put.ResumeCache(cachefile.name)
156             path = cachefile.name
157         self.last_cache.save('test')
158         self.assertRaises(arv_put.ResumeCacheConflict,
159                           arv_put.ResumeCache, path)
160
161     def test_destroy_cache(self):
162         cachefile = tempfile.NamedTemporaryFile(delete=False)
163         try:
164             cache = arv_put.ResumeCache(cachefile.name)
165             cache.save('test')
166             cache.destroy()
167             try:
168                 arv_put.ResumeCache(cachefile.name)
169             except arv_put.ResumeCacheConflict:
170                 self.fail("could not load cache after destroying it")
171             self.assertRaises(ValueError, cache.load)
172         finally:
173             if os.path.exists(cachefile.name):
174                 os.unlink(cachefile.name)
175
176     def test_restart_cache(self):
177         path = os.path.join(self.make_tmpdir(), 'cache')
178         cache = arv_put.ResumeCache(path)
179         cache.save('test')
180         cache.restart()
181         self.assertRaises(ValueError, cache.load)
182         self.assertRaises(arv_put.ResumeCacheConflict,
183                           arv_put.ResumeCache, path)
184
185
186 class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
187     def setUp(self):
188         super(ArvadosPutCollectionWriterTest, self).setUp()
189         with tempfile.NamedTemporaryFile(delete=False) as cachefile:
190             self.cache = arv_put.ResumeCache(cachefile.name)
191             self.cache_filename = cachefile.name
192
193     def tearDown(self):
194         super(ArvadosPutCollectionWriterTest, self).tearDown()
195         if os.path.exists(self.cache_filename):
196             self.cache.destroy()
197         self.cache.close()
198
199     def test_writer_caches(self):
200         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
201         cwriter.write_file('/dev/null')
202         cwriter.cache_state()
203         self.assertTrue(self.cache.load())
204         self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
205
206     def test_writer_works_without_cache(self):
207         cwriter = arv_put.ArvPutCollectionWriter()
208         cwriter.write_file('/dev/null')
209         self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
210
211     def test_writer_resumes_from_cache(self):
212         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
213         with self.make_test_file() as testfile:
214             cwriter.write_file(testfile.name, 'test')
215             cwriter.cache_state()
216             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
217                 self.cache)
218             self.assertEquals(
219                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
220                 new_writer.manifest_text())
221
222     def test_new_writer_from_stale_cache(self):
223         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
224         with self.make_test_file() as testfile:
225             cwriter.write_file(testfile.name, 'test')
226         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
227         new_writer.write_file('/dev/null')
228         self.assertEquals(". 0:0:null\n", new_writer.manifest_text())
229
230     def test_new_writer_from_empty_cache(self):
231         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
232         cwriter.write_file('/dev/null')
233         self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
234
235     def test_writer_resumable_after_arbitrary_bytes(self):
236         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
237         # These bytes are intentionally not valid UTF-8.
238         with self.make_test_file('\x00\x07\xe2') as testfile:
239             cwriter.write_file(testfile.name, 'test')
240             cwriter.cache_state()
241             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
242                 self.cache)
243         self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
244
245     def make_progress_tester(self):
246         progression = []
247         def record_func(written, expected):
248             progression.append((written, expected))
249         return progression, record_func
250
251     def test_progress_reporting(self):
252         for expect_count in (None, 8):
253             progression, reporter = self.make_progress_tester()
254             cwriter = arv_put.ArvPutCollectionWriter(
255                 reporter=reporter, bytes_expected=expect_count)
256             with self.make_test_file() as testfile:
257                 cwriter.write_file(testfile.name, 'test')
258             cwriter.finish_current_stream()
259             self.assertIn((4, expect_count), progression)
260
261     def test_resume_progress(self):
262         cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
263         with self.make_test_file() as testfile:
264             # Set up a writer with some flushed bytes.
265             cwriter.write_file(testfile.name, 'test')
266             cwriter.finish_current_stream()
267             cwriter.cache_state()
268             new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
269             self.assertEqual(new_writer.bytes_written, 4)
270
271
272 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
273     TEST_SIZE = os.path.getsize(__file__)
274
275     def test_expected_bytes_for_file(self):
276         self.assertEquals(self.TEST_SIZE,
277                           arv_put.expected_bytes_for([__file__]))
278
279     def test_expected_bytes_for_tree(self):
280         tree = self.make_tmpdir()
281         shutil.copyfile(__file__, os.path.join(tree, 'one'))
282         shutil.copyfile(__file__, os.path.join(tree, 'two'))
283         self.assertEquals(self.TEST_SIZE * 2,
284                           arv_put.expected_bytes_for([tree]))
285         self.assertEquals(self.TEST_SIZE * 3,
286                           arv_put.expected_bytes_for([tree, __file__]))
287
288     def test_expected_bytes_for_device(self):
289         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
290         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
291
292
293 class ArvadosPutReportTest(ArvadosBaseTestCase):
294     def test_machine_progress(self):
295         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
296             expect = ": {} written {} total\n".format(
297                 count, -1 if (total is None) else total)
298             self.assertTrue(
299                 arv_put.machine_progress(count, total).endswith(expect))
300
301     def test_known_human_progress(self):
302         for count, total in [(0, 1), (2, 4), (45, 60)]:
303             expect = '{:.1%}'.format(float(count) / total)
304             actual = arv_put.human_progress(count, total)
305             self.assertTrue(actual.startswith('\r'))
306             self.assertIn(expect, actual)
307
308     def test_unknown_human_progress(self):
309         for count in [1, 20, 300, 4000, 50000]:
310             self.assertTrue(re.search(r'\b{}\b'.format(count),
311                                       arv_put.human_progress(count, None)))
312
313
314 class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
315     def test_simple_file_put(self):
316         with self.make_test_file() as testfile:
317             path = testfile.name
318             arv_put.main(['--stream', '--no-progress', path])
319         self.assertTrue(
320             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
321                                         '098f6bcd4621d373cade4e832627b4f6')),
322             "did not find file stream in Keep store")
323
324     def test_short_put_from_stdin(self):
325         # Have to run this separately since arv-put can't read from the
326         # tests' stdin.
327         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
328         # case, because the /proc entry is already gone by the time it tries.
329         pipe = subprocess.Popen(
330             [sys.executable, arv_put.__file__, '--stream'],
331             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
332             stderr=open('/dev/null', 'w'))
333         pipe.stdin.write('stdin test\n')
334         pipe.stdin.close()
335         deadline = time.time() + 5
336         while (pipe.poll() is None) and (time.time() < deadline):
337             time.sleep(.1)
338         if pipe.returncode is None:
339             pipe.terminate()
340             self.fail("arv-put did not PUT from stdin within 5 seconds")
341         self.assertEquals(pipe.returncode, 0)
342         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
343
344
345 if __name__ == '__main__':
346     unittest.main()