Merge branch 'master' into 2955-fail-orphan-jobs
[arvados.git] / sdk / python / tests / test_arv_put.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import apiclient
5 import os
6 import re
7 import shutil
8 import subprocess
9 import sys
10 import tempfile
11 import time
12 import unittest
13 import yaml
14
15 import arvados
16 import arvados.commands.put as arv_put
17
18 from arvados_testutil import ArvadosBaseTestCase, ArvadosKeepLocalStoreTestCase
19 import run_test_server
20
21 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
22     CACHE_ARGSET = [
23         [],
24         ['/dev/null'],
25         ['/dev/null', '--filename', 'empty'],
26         ['/tmp'],
27         ['/tmp', '--max-manifest-depth', '0'],
28         ['/tmp', '--max-manifest-depth', '1']
29         ]
30
31     def tearDown(self):
32         super(ArvadosPutResumeCacheTest, self).tearDown()
33         try:
34             self.last_cache.destroy()
35         except AttributeError:
36             pass
37
38     def cache_path_from_arglist(self, arglist):
39         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
40
41     def test_cache_names_stable(self):
42         for argset in self.CACHE_ARGSET:
43             self.assertEquals(self.cache_path_from_arglist(argset),
44                               self.cache_path_from_arglist(argset),
45                               "cache name changed for {}".format(argset))
46
47     def test_cache_names_unique(self):
48         results = []
49         for argset in self.CACHE_ARGSET:
50             path = self.cache_path_from_arglist(argset)
51             self.assertNotIn(path, results)
52             results.append(path)
53
54     def test_cache_names_simple(self):
55         # The goal here is to make sure the filename doesn't use characters
56         # reserved by the filesystem.  Feel free to adjust this regexp as
57         # long as it still does that.
58         bad_chars = re.compile(r'[^-\.\w]')
59         for argset in self.CACHE_ARGSET:
60             path = self.cache_path_from_arglist(argset)
61             self.assertFalse(bad_chars.search(os.path.basename(path)),
62                              "path too exotic: {}".format(path))
63
64     def test_cache_names_ignore_argument_order(self):
65         self.assertEquals(
66             self.cache_path_from_arglist(['a', 'b', 'c']),
67             self.cache_path_from_arglist(['c', 'a', 'b']))
68         self.assertEquals(
69             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
70             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
71
72     def test_cache_names_differ_for_similar_paths(self):
73         # This test needs names at / that don't exist on the real filesystem.
74         self.assertNotEqual(
75             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
76             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
77
78     def test_cache_names_ignore_irrelevant_arguments(self):
79         # Workaround: parse_arguments bails on --filename with a directory.
80         path1 = self.cache_path_from_arglist(['/tmp'])
81         args = arv_put.parse_arguments(['/tmp'])
82         args.filename = 'tmp'
83         path2 = arv_put.ResumeCache.make_path(args)
84         self.assertEquals(path1, path2,
85                          "cache path considered --filename for directory")
86         self.assertEquals(
87             self.cache_path_from_arglist(['-']),
88             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
89             "cache path considered --max-manifest-depth for file")
90
91     def test_cache_names_treat_negative_manifest_depths_identically(self):
92         base_args = ['/tmp', '--max-manifest-depth']
93         self.assertEquals(
94             self.cache_path_from_arglist(base_args + ['-1']),
95             self.cache_path_from_arglist(base_args + ['-2']))
96
97     def test_cache_names_treat_stdin_consistently(self):
98         self.assertEquals(
99             self.cache_path_from_arglist(['-', '--filename', 'test']),
100             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
101
102     def test_cache_names_identical_for_synonymous_names(self):
103         self.assertEquals(
104             self.cache_path_from_arglist(['.']),
105             self.cache_path_from_arglist([os.path.realpath('.')]))
106         testdir = self.make_tmpdir()
107         looplink = os.path.join(testdir, 'loop')
108         os.symlink(testdir, looplink)
109         self.assertEquals(
110             self.cache_path_from_arglist([testdir]),
111             self.cache_path_from_arglist([looplink]))
112
113     def test_cache_names_different_by_api_host(self):
114         config = arvados.config.settings()
115         orig_host = config.get('ARVADOS_API_HOST')
116         try:
117             name1 = self.cache_path_from_arglist(['.'])
118             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
119             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
120         finally:
121             if orig_host is None:
122                 del config['ARVADOS_API_HOST']
123             else:
124                 config['ARVADOS_API_HOST'] = orig_host
125
126     def test_basic_cache_storage(self):
127         thing = ['test', 'list']
128         with tempfile.NamedTemporaryFile() as cachefile:
129             self.last_cache = arv_put.ResumeCache(cachefile.name)
130         self.last_cache.save(thing)
131         self.assertEquals(thing, self.last_cache.load())
132
133     def test_empty_cache(self):
134         with tempfile.NamedTemporaryFile() as cachefile:
135             cache = arv_put.ResumeCache(cachefile.name)
136         self.assertRaises(ValueError, cache.load)
137
138     def test_cache_persistent(self):
139         thing = ['test', 'list']
140         path = os.path.join(self.make_tmpdir(), 'cache')
141         cache = arv_put.ResumeCache(path)
142         cache.save(thing)
143         cache.close()
144         self.last_cache = arv_put.ResumeCache(path)
145         self.assertEquals(thing, self.last_cache.load())
146
147     def test_multiple_cache_writes(self):
148         thing = ['short', 'list']
149         with tempfile.NamedTemporaryFile() as cachefile:
150             self.last_cache = arv_put.ResumeCache(cachefile.name)
151         # Start writing an object longer than the one we test, to make
152         # sure the cache file gets truncated.
153         self.last_cache.save(['long', 'long', 'list'])
154         self.last_cache.save(thing)
155         self.assertEquals(thing, self.last_cache.load())
156
157     def test_cache_is_locked(self):
158         with tempfile.NamedTemporaryFile() as cachefile:
159             cache = arv_put.ResumeCache(cachefile.name)
160             self.assertRaises(arv_put.ResumeCacheConflict,
161                               arv_put.ResumeCache, cachefile.name)
162
163     def test_cache_stays_locked(self):
164         with tempfile.NamedTemporaryFile() as cachefile:
165             self.last_cache = arv_put.ResumeCache(cachefile.name)
166             path = cachefile.name
167         self.last_cache.save('test')
168         self.assertRaises(arv_put.ResumeCacheConflict,
169                           arv_put.ResumeCache, path)
170
171     def test_destroy_cache(self):
172         cachefile = tempfile.NamedTemporaryFile(delete=False)
173         try:
174             cache = arv_put.ResumeCache(cachefile.name)
175             cache.save('test')
176             cache.destroy()
177             try:
178                 arv_put.ResumeCache(cachefile.name)
179             except arv_put.ResumeCacheConflict:
180                 self.fail("could not load cache after destroying it")
181             self.assertRaises(ValueError, cache.load)
182         finally:
183             if os.path.exists(cachefile.name):
184                 os.unlink(cachefile.name)
185
186     def test_restart_cache(self):
187         path = os.path.join(self.make_tmpdir(), 'cache')
188         cache = arv_put.ResumeCache(path)
189         cache.save('test')
190         cache.restart()
191         self.assertRaises(ValueError, cache.load)
192         self.assertRaises(arv_put.ResumeCacheConflict,
193                           arv_put.ResumeCache, path)
194
195
196 class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase):
197     def setUp(self):
198         super(ArvadosPutCollectionWriterTest, self).setUp()
199         with tempfile.NamedTemporaryFile(delete=False) as cachefile:
200             self.cache = arv_put.ResumeCache(cachefile.name)
201             self.cache_filename = cachefile.name
202
203     def tearDown(self):
204         super(ArvadosPutCollectionWriterTest, self).tearDown()
205         if os.path.exists(self.cache_filename):
206             self.cache.destroy()
207         self.cache.close()
208
209     def test_writer_caches(self):
210         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
211         cwriter.write_file('/dev/null')
212         cwriter.cache_state()
213         self.assertTrue(self.cache.load())
214         self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
215
216     def test_writer_works_without_cache(self):
217         cwriter = arv_put.ArvPutCollectionWriter()
218         cwriter.write_file('/dev/null')
219         self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
220
221     def test_writer_resumes_from_cache(self):
222         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
223         with self.make_test_file() as testfile:
224             cwriter.write_file(testfile.name, 'test')
225             cwriter.cache_state()
226             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
227                 self.cache)
228             self.assertEquals(
229                 ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
230                 new_writer.manifest_text())
231
232     def test_new_writer_from_stale_cache(self):
233         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
234         with self.make_test_file() as testfile:
235             cwriter.write_file(testfile.name, 'test')
236         new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
237         new_writer.write_file('/dev/null')
238         self.assertEquals(". 0:0:null\n", new_writer.manifest_text())
239
240     def test_new_writer_from_empty_cache(self):
241         cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
242         cwriter.write_file('/dev/null')
243         self.assertEquals(". 0:0:null\n", cwriter.manifest_text())
244
245     def test_writer_resumable_after_arbitrary_bytes(self):
246         cwriter = arv_put.ArvPutCollectionWriter(self.cache)
247         # These bytes are intentionally not valid UTF-8.
248         with self.make_test_file('\x00\x07\xe2') as testfile:
249             cwriter.write_file(testfile.name, 'test')
250             cwriter.cache_state()
251             new_writer = arv_put.ArvPutCollectionWriter.from_cache(
252                 self.cache)
253         self.assertEquals(cwriter.manifest_text(), new_writer.manifest_text())
254
255     def make_progress_tester(self):
256         progression = []
257         def record_func(written, expected):
258             progression.append((written, expected))
259         return progression, record_func
260
261     def test_progress_reporting(self):
262         for expect_count in (None, 8):
263             progression, reporter = self.make_progress_tester()
264             cwriter = arv_put.ArvPutCollectionWriter(
265                 reporter=reporter, bytes_expected=expect_count)
266             with self.make_test_file() as testfile:
267                 cwriter.write_file(testfile.name, 'test')
268             cwriter.finish_current_stream()
269             self.assertIn((4, expect_count), progression)
270
271     def test_resume_progress(self):
272         cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
273         with self.make_test_file() as testfile:
274             # Set up a writer with some flushed bytes.
275             cwriter.write_file(testfile.name, 'test')
276             cwriter.finish_current_stream()
277             cwriter.cache_state()
278             new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
279             self.assertEqual(new_writer.bytes_written, 4)
280
281
282 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
283     TEST_SIZE = os.path.getsize(__file__)
284
285     def test_expected_bytes_for_file(self):
286         self.assertEquals(self.TEST_SIZE,
287                           arv_put.expected_bytes_for([__file__]))
288
289     def test_expected_bytes_for_tree(self):
290         tree = self.make_tmpdir()
291         shutil.copyfile(__file__, os.path.join(tree, 'one'))
292         shutil.copyfile(__file__, os.path.join(tree, 'two'))
293         self.assertEquals(self.TEST_SIZE * 2,
294                           arv_put.expected_bytes_for([tree]))
295         self.assertEquals(self.TEST_SIZE * 3,
296                           arv_put.expected_bytes_for([tree, __file__]))
297
298     def test_expected_bytes_for_device(self):
299         self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
300         self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
301
302
303 class ArvadosPutReportTest(ArvadosBaseTestCase):
304     def test_machine_progress(self):
305         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
306             expect = ": {} written {} total\n".format(
307                 count, -1 if (total is None) else total)
308             self.assertTrue(
309                 arv_put.machine_progress(count, total).endswith(expect))
310
311     def test_known_human_progress(self):
312         for count, total in [(0, 1), (2, 4), (45, 60)]:
313             expect = '{:.1%}'.format(float(count) / total)
314             actual = arv_put.human_progress(count, total)
315             self.assertTrue(actual.startswith('\r'))
316             self.assertIn(expect, actual)
317
318     def test_unknown_human_progress(self):
319         for count in [1, 20, 300, 4000, 50000]:
320             self.assertTrue(re.search(r'\b{}\b'.format(count),
321                                       arv_put.human_progress(count, None)))
322
323
324 class ArvadosPutTest(ArvadosKeepLocalStoreTestCase):
325     def call_main_on_test_file(self):
326         with self.make_test_file() as testfile:
327             path = testfile.name
328             arv_put.main(['--stream', '--no-progress', path])
329         self.assertTrue(
330             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
331                                         '098f6bcd4621d373cade4e832627b4f6')),
332             "did not find file stream in Keep store")
333
334     def test_simple_file_put(self):
335         self.call_main_on_test_file()
336
337     def test_put_with_unwriteable_cache_dir(self):
338         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
339         cachedir = self.make_tmpdir()
340         os.chmod(cachedir, 0o0)
341         arv_put.ResumeCache.CACHE_DIR = cachedir
342         try:
343             self.call_main_on_test_file()
344         finally:
345             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
346             os.chmod(cachedir, 0o700)
347
348     def test_put_with_unwritable_cache_subdir(self):
349         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
350         cachedir = self.make_tmpdir()
351         os.chmod(cachedir, 0o0)
352         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
353         try:
354             self.call_main_on_test_file()
355         finally:
356             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
357             os.chmod(cachedir, 0o700)
358
359     def test_short_put_from_stdin(self):
360         # Have to run this separately since arv-put can't read from the
361         # tests' stdin.
362         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
363         # case, because the /proc entry is already gone by the time it tries.
364         pipe = subprocess.Popen(
365             [sys.executable, arv_put.__file__, '--stream'],
366             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
367             stderr=subprocess.STDOUT)
368         pipe.stdin.write('stdin test\n')
369         pipe.stdin.close()
370         deadline = time.time() + 5
371         while (pipe.poll() is None) and (time.time() < deadline):
372             time.sleep(.1)
373         returncode = pipe.poll()
374         if returncode is None:
375             pipe.terminate()
376             self.fail("arv-put did not PUT from stdin within 5 seconds")
377         elif returncode != 0:
378             sys.stdout.write(pipe.stdout.read())
379             self.fail("arv-put returned exit code {}".format(returncode))
380         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
381
382
383 class ArvPutIntegrationTest(unittest.TestCase):
384     @classmethod
385     def setUpClass(cls):
386         try:
387             del os.environ['KEEP_LOCAL_STORE']
388         except KeyError:
389             pass
390
391         # Use the blob_signing_key from the Rails "test" configuration
392         # to provision the Keep server.
393         with open(os.path.join(os.path.dirname(__file__),
394                                run_test_server.ARV_API_SERVER_DIR,
395                                "config",
396                                "application.yml")) as f:
397             rails_config = yaml.load(f.read())
398         config_blob_signing_key = rails_config["test"]["blob_signing_key"]
399         run_test_server.run()
400         run_test_server.run_keep(blob_signing_key=config_blob_signing_key,
401                                  enforce_permissions=True)
402
403     @classmethod
404     def tearDownClass(cls):
405         run_test_server.stop()
406         run_test_server.stop_keep()
407
408     def test_ArvPutSignedManifest(self):
409         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
410         # the newly created manifest from the API server, testing to confirm
411         # that the block locators in the returned manifest are signed.
412         run_test_server.authorize_with('active')
413         for v in ["ARVADOS_API_HOST",
414                   "ARVADOS_API_HOST_INSECURE",
415                   "ARVADOS_API_TOKEN"]:
416             os.environ[v] = arvados.config.settings()[v]
417
418         # Before doing anything, demonstrate that the collection
419         # we're about to create is not present in our test fixture.
420         api = arvados.api('v1', cache=False)
421         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
422         with self.assertRaises(apiclient.errors.HttpError):
423             notfound = api.collections().get(uuid=manifest_uuid).execute()
424
425         datadir = tempfile.mkdtemp()
426         with open(os.path.join(datadir, "foo"), "w") as f:
427             f.write("The quick brown fox jumped over the lazy dog")
428         p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
429                              stdout=subprocess.PIPE)
430         (arvout, arverr) = p.communicate()
431         self.assertEqual(p.returncode, 0)
432         self.assertEqual(arverr, None)
433         self.assertEqual(arvout.strip(), manifest_uuid)
434
435         # The manifest text stored in the API server under the same
436         # manifest UUID must use signed locators.
437         c = api.collections().get(uuid=manifest_uuid).execute()
438         self.assertRegexpMatches(
439             c['manifest_text'],
440             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
441
442         os.remove(os.path.join(datadir, "foo"))
443         os.rmdir(datadir)
444
445
446 if __name__ == '__main__':
447     unittest.main()