Merge branch 'master' into 3140-project-content-tabs
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import os
6 import re
7 import shutil
8 import subprocess
9 import sys
10 import tempfile
11 import time
12 import unittest
13 import yaml
14
15 from cStringIO import StringIO
16
17 import arvados
18 import arvados.commands.put as arv_put
19
20 from arvados_testutil import ArvadosBaseTestCase, ArvadosKeepLocalStoreTestCase
21 import run_test_server
22
23 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
24     CACHE_ARGSET = [
25         [],
26         ['/dev/null'],
27         ['/dev/null', '--filename', 'empty'],
28         ['/tmp'],
29         ['/tmp', '--max-manifest-depth', '0'],
30         ['/tmp', '--max-manifest-depth', '1']
31         ]
32
33     def tearDown(self):
34         super(ArvadosPutResumeCacheTest, self).tearDown()
35         try:
36             self.last_cache.destroy()
37         except AttributeError:
38             pass
39
40     def cache_path_from_arglist(self, arglist):
41         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
42
43     def test_cache_names_stable(self):
44         for argset in self.CACHE_ARGSET:
45             self.assertEquals(self.cache_path_from_arglist(argset),
46                               self.cache_path_from_arglist(argset),
47                               "cache name changed for {}".format(argset))
48
49     def test_cache_names_unique(self):
50         results = []
51         for argset in self.CACHE_ARGSET:
52             path = self.cache_path_from_arglist(argset)
53             self.assertNotIn(path, results)
54             results.append(path)
55
56     def test_cache_names_simple(self):
57         # The goal here is to make sure the filename doesn't use characters
58         # reserved by the filesystem.  Feel free to adjust this regexp as
59         # long as it still does that.
60         bad_chars = re.compile(r'[^-\.\w]')
61         for argset in self.CACHE_ARGSET:
62             path = self.cache_path_from_arglist(argset)
63             self.assertFalse(bad_chars.search(os.path.basename(path)),
64                              "path too exotic: {}".format(path))
65
66     def test_cache_names_ignore_argument_order(self):
67         self.assertEquals(
68             self.cache_path_from_arglist(['a', 'b', 'c']),
69             self.cache_path_from_arglist(['c', 'a', 'b']))
70         self.assertEquals(
71             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
72             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
73
74     def test_cache_names_differ_for_similar_paths(self):
75         # This test needs names at / that don't exist on the real filesystem.
76         self.assertNotEqual(
77             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
78             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
79
80     def test_cache_names_ignore_irrelevant_arguments(self):
81         # Workaround: parse_arguments bails on --filename with a directory.
82         path1 = self.cache_path_from_arglist(['/tmp'])
83         args = arv_put.parse_arguments(['/tmp'])
84         args.filename = 'tmp'
85         path2 = arv_put.ResumeCache.make_path(args)
86         self.assertEquals(path1, path2,
87                          "cache path considered --filename for directory")
88         self.assertEquals(
89             self.cache_path_from_arglist(['-']),
90             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
91             "cache path considered --max-manifest-depth for file")
92
93     def test_cache_names_treat_negative_manifest_depths_identically(self):
94         base_args = ['/tmp', '--max-manifest-depth']
95         self.assertEquals(
96             self.cache_path_from_arglist(base_args + ['-1']),
97             self.cache_path_from_arglist(base_args + ['-2']))
98
99     def test_cache_names_treat_stdin_consistently(self):
100         self.assertEquals(
101             self.cache_path_from_arglist(['-', '--filename', 'test']),
102             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
103
104     def test_cache_names_identical_for_synonymous_names(self):
105         self.assertEquals(
106             self.cache_path_from_arglist(['.']),
107             self.cache_path_from_arglist([os.path.realpath('.')]))
108         testdir = self.make_tmpdir()
109         looplink = os.path.join(testdir, 'loop')
110         os.symlink(testdir, looplink)
111         self.assertEquals(
112             self.cache_path_from_arglist([testdir]),
113             self.cache_path_from_arglist([looplink]))
114
115     def test_cache_names_different_by_api_host(self):
116         config = arvados.config.settings()
117         orig_host = config.get('ARVADOS_API_HOST')
118         try:
119             name1 = self.cache_path_from_arglist(['.'])
120             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
121             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
122         finally:
123             if orig_host is None:
124                 del config['ARVADOS_API_HOST']
125             else:
126                 config['ARVADOS_API_HOST'] = orig_host
127
128     def test_basic_cache_storage(self):
129         thing = ['test', 'list']
130         with tempfile.NamedTemporaryFile() as cachefile:
131             self.last_cache = arv_put.ResumeCache(cachefile.name)
132         self.last_cache.save(thing)
133         self.assertEquals(thing, self.last_cache.load())
134
135     def test_empty_cache(self):
136         with tempfile.NamedTemporaryFile() as cachefile:
137             cache = arv_put.ResumeCache(cachefile.name)
138         self.assertRaises(ValueError, cache.load)
139
140     def test_cache_persistent(self):
141         thing = ['test', 'list']
142         path = os.path.join(self.make_tmpdir(), 'cache')
143         cache = arv_put.ResumeCache(path)
144         cache.save(thing)
145         cache.close()
146         self.last_cache = arv_put.ResumeCache(path)
147         self.assertEquals(thing, self.last_cache.load())
148
149     def test_multiple_cache_writes(self):
150         thing = ['short', 'list']
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         # Start writing an object longer than the one we test, to make
154         # sure the cache file gets truncated.
155         self.last_cache.save(['long', 'long', 'list'])
156         self.last_cache.save(thing)
157         self.assertEquals(thing, self.last_cache.load())
158
159     def test_cache_is_locked(self):
160         with tempfile.NamedTemporaryFile() as cachefile:
161             cache = arv_put.ResumeCache(cachefile.name)
162             self.assertRaises(arv_put.ResumeCacheConflict,
163                               arv_put.ResumeCache, cachefile.name)
164
165     def test_cache_stays_locked(self):
166         with tempfile.NamedTemporaryFile() as cachefile:
167             self.last_cache = arv_put.ResumeCache(cachefile.name)
168             path = cachefile.name
169         self.last_cache.save('test')
170         self.assertRaises(arv_put.ResumeCacheConflict,
171                           arv_put.ResumeCache, path)
172
173     def test_destroy_cache(self):
174         cachefile = tempfile.NamedTemporaryFile(delete=False)
175         try:
176             cache = arv_put.ResumeCache(cachefile.name)
177             cache.save('test')
178             cache.destroy()
179             try:
180                 arv_put.ResumeCache(cachefile.name)
181             except arv_put.ResumeCacheConflict:
182                 self.fail("could not load cache after destroying it")
183             self.assertRaises(ValueError, cache.load)
184         finally:
185             if os.path.exists(cachefile.name):
186                 os.unlink(cachefile.name)
187
188     def test_restart_cache(self):
189         path = os.path.join(self.make_tmpdir(), 'cache')
190         cache = arv_put.ResumeCache(path)
191         cache.save('test')
192         cache.restart()
193         self.assertRaises(ValueError, cache.load)
194         self.assertRaises(arv_put.ResumeCacheConflict,
195                           arv_put.ResumeCache, path)
196
197
198 class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
199     def setUp(self):
200         super(ArvadosPutCollectionWriterTest, self).setUp()
201         with tempfile.NamedTemporaryFile(delete=False) as cachefile:
202             self.cache = arv_put.ResumeCache(cachefile.name)
203             self.cache_filename = cachefile.name
204
205     def tearDown(self):
206         super(ArvadosPutCollectionWriterTest, self).tearDown()
207         if os.path.exists(self.cache_filename):
208             self.cache.destroy()
209         self.cache.close()
210
211     def test_writer_caches(self):
212         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
213         cwriter.write_file('/dev/null')
214         cwriter.cache_state()
215         self.assertTrue(self.cache.load())
216         self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
217
218     def test_writer_works_without_cache(self):
219         cwriter = arv_put.ArvPutCollectionWriter()
220         cwriter.write_file('/dev/null')
221         self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
222
223     def test_writer_resumes_from_cache(self):
224         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
225         with self.make_test_file() as testfile:
226             cwriter.write_file(testfile.name, 'test')
227             cwriter.cache_state()
228             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
229                 self.cache)
230             self.assertEquals(
231                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
232                 new_writer.manifest_text())
233
234     def test_new_writer_from_stale_cache(self):
235         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
236         with self.make_test_file() as testfile:
237             cwriter.write_file(testfile.name, 'test')
238         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
239         new_writer.write_file('/dev/null')
240         self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
241
242     def test_new_writer_from_empty_cache(self):
243         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
244         cwriter.write_file('/dev/null')
245         self.assertEquals(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
246
247     def test_writer_resumable_after_arbitrary_bytes(self):
248         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
249         # These bytes are intentionally not valid UTF-8.
250         with self.make_test_file('\x00\x07\xe2') as testfile:
251             cwriter.write_file(testfile.name, 'test')
252             cwriter.cache_state()
253             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
254                 self.cache)
255         self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
256
257     def make_progress_tester(self):
258         progression = []
259         def record_func(written, expected):
260             progression.append((written, expected))
261         return progression, record_func
262
263     def test_progress_reporting(self):
264         for expect_count in (None, 8):
265             progression, reporter = self.make_progress_tester()
266             cwriter = arv_put.ArvPutCollectionWriter(
267                 reporter=reporter, bytes_expected=expect_count)
268             with self.make_test_file() as testfile:
269                 cwriter.write_file(testfile.name, 'test')
270             cwriter.finish_current_stream()
271             self.assertIn((4, expect_count), progression)
272
273     def test_resume_progress(self):
274         cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
275         with self.make_test_file() as testfile:
276             # Set up a writer with some flushed bytes.
277             cwriter.write_file(testfile.name, 'test')
278             cwriter.finish_current_stream()
279             cwriter.cache_state()
280             new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
281             self.assertEqual(new_writer.bytes_written, 4)
282
283
284 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
285     TEST_SIZE = os.path.getsize(__file__)
286
287     def test_expected_bytes_for_file(self):
288         self.assertEquals(self.TEST_SIZE,
289                           arv_put.expected_bytes_for([__file__]))
290
291     def test_expected_bytes_for_tree(self):
292         tree = self.make_tmpdir()
293         shutil.copyfile(__file__, os.path.join(tree, 'one'))
294         shutil.copyfile(__file__, os.path.join(tree, 'two'))
295         self.assertEquals(self.TEST_SIZE * 2,
296                           arv_put.expected_bytes_for([tree]))
297         self.assertEquals(self.TEST_SIZE * 3,
298                           arv_put.expected_bytes_for([tree, __file__]))
299
300     def test_expected_bytes_for_device(self):
301         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
302         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
303
304
305 class ArvadosPutReportTest(ArvadosBaseTestCase):
306     def test_machine_progress(self):
307         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
308             expect = ": {} written {} total\n".format(
309                 count, -1 if (total is None) else total)
310             self.assertTrue(
311                 arv_put.machine_progress(count, total).endswith(expect))
312
313     def test_known_human_progress(self):
314         for count, total in [(0, 1), (2, 4), (45, 60)]:
315             expect = '{:.1%}'.format(float(count) / total)
316             actual = arv_put.human_progress(count, total)
317             self.assertTrue(actual.startswith('\r'))
318             self.assertIn(expect, actual)
319
320     def test_unknown_human_progress(self):
321         for count in [1, 20, 300, 4000, 50000]:
322             self.assertTrue(re.search(r'\b{}\b'.format(count),
323                                       arv_put.human_progress(count, None)))
324
325
326 class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
327     def call_main_on_test_file(self):
328         self.main_output = StringIO()
329         with self.make_test_file() as testfile:
330             path = testfile.name
331             arv_put.main(['--stream', '--no-progress', path], self.main_output)
332         self.assertTrue(
333             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
334                                         '098f6bcd4621d373cade4e832627b4f6')),
335             "did not find file stream in Keep store")
336
337     def test_simple_file_put(self):
338         self.call_main_on_test_file()
339
340     def test_put_with_unwriteable_cache_dir(self):
341         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
342         cachedir = self.make_tmpdir()
343         os.chmod(cachedir, 0o0)
344         arv_put.ResumeCache.CACHE_DIR = cachedir
345         try:
346             self.call_main_on_test_file()
347         finally:
348             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
349             os.chmod(cachedir, 0o700)
350
351     def test_put_with_unwritable_cache_subdir(self):
352         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
353         cachedir = self.make_tmpdir()
354         os.chmod(cachedir, 0o0)
355         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
356         try:
357             self.call_main_on_test_file()
358         finally:
359             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
360             os.chmod(cachedir, 0o700)
361
362     def test_short_put_from_stdin(self):
363         # Have to run this separately since arv-put can't read from the
364         # tests' stdin.
365         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
366         # case, because the /proc entry is already gone by the time it tries.
367         pipe = subprocess.Popen(
368             [sys.executable, arv_put.__file__, '--stream'],
369             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
370             stderr=subprocess.STDOUT)
371         pipe.stdin.write('stdin test\n')
372         pipe.stdin.close()
373         deadline = time.time() + 5
374         while (pipe.poll() is None) and (time.time() < deadline):
375             time.sleep(.1)
376         returncode = pipe.poll()
377         if returncode is None:
378             pipe.terminate()
379             self.fail("arv-put did not PUT from stdin within 5 seconds")
380         elif returncode != 0:
381             sys.stdout.write(pipe.stdout.read())
382             self.fail("arv-put returned exit code {}".format(returncode))
383         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
384
385
386 class ArvPutIntegrationTest(unittest.TestCase):
387     @classmethod
388     def setUpClass(cls):
389         try:
390             del os.environ['KEEP_LOCAL_STORE']
391         except KeyError:
392             pass
393
394         # Use the blob_signing_key from the Rails "test" configuration
395         # to provision the Keep server.
396         with open(os.path.join(os.path.dirname(__file__),
397                                run_test_server.ARV_API_SERVER_DIR,
398                                "config",
399                                "application.yml")) as f:
400             rails_config = yaml.load(f.read())
401         try:
402             config_blob_signing_key = rails_config["test"]["blob_signing_key"]
403         except KeyError:
404             config_blob_signing_key = rails_config["common"]["blob_signing_key"]
405         run_test_server.run()
406         run_test_server.run_keep(blob_signing_key=config_blob_signing_key,
407                                  enforce_permissions=True)
408
409     @classmethod
410     def tearDownClass(cls):
411         run_test_server.stop()
412         run_test_server.stop_keep()
413
414     def test_ArvPutSignedManifest(self):
415         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
416         # the newly created manifest from the API server, testing to confirm
417         # that the block locators in the returned manifest are signed.
418         run_test_server.authorize_with('active')
419         for v in ["ARVADOS_API_HOST",
420                   "ARVADOS_API_HOST_INSECURE",
421                   "ARVADOS_API_TOKEN"]:
422             os.environ[v] = arvados.config.settings()[v]
423
424         # Before doing anything, demonstrate that the collection
425         # we're about to create is not present in our test fixture.
426         api = arvados.api('v1', cache=False)
427         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
428         with self.assertRaises(apiclient.errors.HttpError):
429             notfound = api.collections().get(uuid=manifest_uuid).execute()
430
431         datadir = tempfile.mkdtemp()
432         with open(os.path.join(datadir, "foo"), "w") as f:
433             f.write("The quick brown fox jumped over the lazy dog")
434         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
435                              stdout=subprocess.PIPE)
436         (arvout, arverr) = p.communicate()
437         self.assertEqual(p.returncode, 0)
438         self.assertEqual(arverr, None)
439         self.assertEqual(arvout.strip(), manifest_uuid)
440
441         # The manifest text stored in the API server under the same
442         # manifest UUID must use signed locators.
443         c = api.collections().get(uuid=manifest_uuid).execute()
444         self.assertRegexpMatches(
445             c['manifest_text'],
446             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
447
448         os.remove(os.path.join(datadir, "foo"))
449         os.rmdir(datadir)
450
451
452 if __name__ == '__main__':
453     unittest.main()