11789: Added tests.
[arvados.git] / sdk / python / tests / test_arv_put.py
1 from __future__ import absolute_import
2 from __future__ import division
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import str
6 from builtins import range
7 import apiclient
8 import mock
9 import os
10 import pwd
11 import re
12 import shutil
13 import subprocess
14 import sys
15 import tempfile
16 import time
17 import unittest
18 import yaml
19 import threading
20 import hashlib
21 import random
22 import uuid
23
24 import arvados
25 import arvados.commands.put as arv_put
26 from . import arvados_testutil as tutil
27
28 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
29 from . import run_test_server
30
31 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
32     CACHE_ARGSET = [
33         [],
34         ['/dev/null'],
35         ['/dev/null', '--filename', 'empty'],
36         ['/tmp']
37         ]
38
39     def tearDown(self):
40         super(ArvadosPutResumeCacheTest, self).tearDown()
41         try:
42             self.last_cache.destroy()
43         except AttributeError:
44             pass
45
46     def cache_path_from_arglist(self, arglist):
47         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
48
49     def test_cache_names_stable(self):
50         for argset in self.CACHE_ARGSET:
51             self.assertEqual(self.cache_path_from_arglist(argset),
52                               self.cache_path_from_arglist(argset),
53                               "cache name changed for {}".format(argset))
54
55     def test_cache_names_unique(self):
56         results = []
57         for argset in self.CACHE_ARGSET:
58             path = self.cache_path_from_arglist(argset)
59             self.assertNotIn(path, results)
60             results.append(path)
61
62     def test_cache_names_simple(self):
63         # The goal here is to make sure the filename doesn't use characters
64         # reserved by the filesystem.  Feel free to adjust this regexp as
65         # long as it still does that.
66         bad_chars = re.compile(r'[^-\.\w]')
67         for argset in self.CACHE_ARGSET:
68             path = self.cache_path_from_arglist(argset)
69             self.assertFalse(bad_chars.search(os.path.basename(path)),
70                              "path too exotic: {}".format(path))
71
72     def test_cache_names_ignore_argument_order(self):
73         self.assertEqual(
74             self.cache_path_from_arglist(['a', 'b', 'c']),
75             self.cache_path_from_arglist(['c', 'a', 'b']))
76         self.assertEqual(
77             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
78             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
79
80     def test_cache_names_differ_for_similar_paths(self):
81         # This test needs names at / that don't exist on the real filesystem.
82         self.assertNotEqual(
83             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
84             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
85
86     def test_cache_names_ignore_irrelevant_arguments(self):
87         # Workaround: parse_arguments bails on --filename with a directory.
88         path1 = self.cache_path_from_arglist(['/tmp'])
89         args = arv_put.parse_arguments(['/tmp'])
90         args.filename = 'tmp'
91         path2 = arv_put.ResumeCache.make_path(args)
92         self.assertEqual(path1, path2,
93                          "cache path considered --filename for directory")
94         self.assertEqual(
95             self.cache_path_from_arglist(['-']),
96             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
97             "cache path considered --max-manifest-depth for file")
98
99     def test_cache_names_treat_negative_manifest_depths_identically(self):
100         base_args = ['/tmp', '--max-manifest-depth']
101         self.assertEqual(
102             self.cache_path_from_arglist(base_args + ['-1']),
103             self.cache_path_from_arglist(base_args + ['-2']))
104
105     def test_cache_names_treat_stdin_consistently(self):
106         self.assertEqual(
107             self.cache_path_from_arglist(['-', '--filename', 'test']),
108             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
109
110     def test_cache_names_identical_for_synonymous_names(self):
111         self.assertEqual(
112             self.cache_path_from_arglist(['.']),
113             self.cache_path_from_arglist([os.path.realpath('.')]))
114         testdir = self.make_tmpdir()
115         looplink = os.path.join(testdir, 'loop')
116         os.symlink(testdir, looplink)
117         self.assertEqual(
118             self.cache_path_from_arglist([testdir]),
119             self.cache_path_from_arglist([looplink]))
120
121     def test_cache_names_different_by_api_host(self):
122         config = arvados.config.settings()
123         orig_host = config.get('ARVADOS_API_HOST')
124         try:
125             name1 = self.cache_path_from_arglist(['.'])
126             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
127             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
128         finally:
129             if orig_host is None:
130                 del config['ARVADOS_API_HOST']
131             else:
132                 config['ARVADOS_API_HOST'] = orig_host
133
134     @mock.patch('arvados.keep.KeepClient.head')
135     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
136         keep_client_head.side_effect = [True]
137         thing = {}
138         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
139         with tempfile.NamedTemporaryFile() as cachefile:
140             self.last_cache = arv_put.ResumeCache(cachefile.name)
141         self.last_cache.save(thing)
142         self.last_cache.close()
143         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
144         self.assertNotEqual(None, resume_cache)
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_finished_streams(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
160         keep_client_head.side_effect = Exception('Locator not found')
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169         self.assertRaises(None, resume_cache.check_cache())
170
171     def test_basic_cache_storage(self):
172         thing = ['test', 'list']
173         with tempfile.NamedTemporaryFile() as cachefile:
174             self.last_cache = arv_put.ResumeCache(cachefile.name)
175         self.last_cache.save(thing)
176         self.assertEqual(thing, self.last_cache.load())
177
178     def test_empty_cache(self):
179         with tempfile.NamedTemporaryFile() as cachefile:
180             cache = arv_put.ResumeCache(cachefile.name)
181         self.assertRaises(ValueError, cache.load)
182
183     def test_cache_persistent(self):
184         thing = ['test', 'list']
185         path = os.path.join(self.make_tmpdir(), 'cache')
186         cache = arv_put.ResumeCache(path)
187         cache.save(thing)
188         cache.close()
189         self.last_cache = arv_put.ResumeCache(path)
190         self.assertEqual(thing, self.last_cache.load())
191
192     def test_multiple_cache_writes(self):
193         thing = ['short', 'list']
194         with tempfile.NamedTemporaryFile() as cachefile:
195             self.last_cache = arv_put.ResumeCache(cachefile.name)
196         # Start writing an object longer than the one we test, to make
197         # sure the cache file gets truncated.
198         self.last_cache.save(['long', 'long', 'list'])
199         self.last_cache.save(thing)
200         self.assertEqual(thing, self.last_cache.load())
201
202     def test_cache_is_locked(self):
203         with tempfile.NamedTemporaryFile() as cachefile:
204             cache = arv_put.ResumeCache(cachefile.name)
205             self.assertRaises(arv_put.ResumeCacheConflict,
206                               arv_put.ResumeCache, cachefile.name)
207
208     def test_cache_stays_locked(self):
209         with tempfile.NamedTemporaryFile() as cachefile:
210             self.last_cache = arv_put.ResumeCache(cachefile.name)
211             path = cachefile.name
212         self.last_cache.save('test')
213         self.assertRaises(arv_put.ResumeCacheConflict,
214                           arv_put.ResumeCache, path)
215
216     def test_destroy_cache(self):
217         cachefile = tempfile.NamedTemporaryFile(delete=False)
218         try:
219             cache = arv_put.ResumeCache(cachefile.name)
220             cache.save('test')
221             cache.destroy()
222             try:
223                 arv_put.ResumeCache(cachefile.name)
224             except arv_put.ResumeCacheConflict:
225                 self.fail("could not load cache after destroying it")
226             self.assertRaises(ValueError, cache.load)
227         finally:
228             if os.path.exists(cachefile.name):
229                 os.unlink(cachefile.name)
230
231     def test_restart_cache(self):
232         path = os.path.join(self.make_tmpdir(), 'cache')
233         cache = arv_put.ResumeCache(path)
234         cache.save('test')
235         cache.restart()
236         self.assertRaises(ValueError, cache.load)
237         self.assertRaises(arv_put.ResumeCacheConflict,
238                           arv_put.ResumeCache, path)
239
240
241 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
242                           ArvadosBaseTestCase):
243
244     def setUp(self):
245         super(ArvPutUploadJobTest, self).setUp()
246         run_test_server.authorize_with('active')
247         # Temp files creation
248         self.tempdir = tempfile.mkdtemp()
249         subdir = os.path.join(self.tempdir, 'subdir')
250         os.mkdir(subdir)
251         data = "x" * 1024 # 1 KB
252         for i in range(1, 5):
253             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
254                 f.write(data * i)
255         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
256             f.write(data * 5)
257         # Large temp file for resume test
258         _, self.large_file_name = tempfile.mkstemp()
259         fileobj = open(self.large_file_name, 'w')
260         # Make sure to write just a little more than one block
261         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
262             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
263             fileobj.write(data)
264         fileobj.close()
265         # Temp dir containing small files to be repacked
266         self.small_files_dir = tempfile.mkdtemp()
267         data = 'y' * 1024 * 1024 # 1 MB
268         for i in range(1, 70):
269             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
270                 f.write(data + str(i))
271         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
272         # Temp dir to hold a symlink to other temp dir
273         self.tempdir_with_symlink = tempfile.mkdtemp()
274         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
275         os.symlink(os.path.join(self.tempdir, '1'),
276                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
277
278     def tearDown(self):
279         super(ArvPutUploadJobTest, self).tearDown()
280         shutil.rmtree(self.tempdir)
281         os.unlink(self.large_file_name)
282         shutil.rmtree(self.small_files_dir)
283         shutil.rmtree(self.tempdir_with_symlink)
284
285     def test_symlinks_are_followed_by_default(self):
286         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
287         cwriter.start(save_collection=False)
288         self.assertIn('linkeddir', cwriter.manifest_text())
289         self.assertIn('linkedfile', cwriter.manifest_text())
290         cwriter.destroy_cache()
291
292     def test_symlinks_are_not_followed_when_requested(self):
293         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
294                                           follow_links=False)
295         cwriter.start(save_collection=False)
296         self.assertNotIn('linkeddir', cwriter.manifest_text())
297         self.assertNotIn('linkedfile', cwriter.manifest_text())
298         cwriter.destroy_cache()
299
300     def test_passing_nonexistant_path_raise_exception(self):
301         uuid_str = str(uuid.uuid4())
302         with self.assertRaises(arv_put.PathDoesNotExistError):
303             cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
304
305     def test_writer_works_without_cache(self):
306         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
307         cwriter.start(save_collection=False)
308         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
309
310     def test_writer_works_with_cache(self):
311         with tempfile.NamedTemporaryFile() as f:
312             f.write(b'foo')
313             f.flush()
314             cwriter = arv_put.ArvPutUploadJob([f.name])
315             cwriter.start(save_collection=False)
316             self.assertEqual(0, cwriter.bytes_skipped)
317             self.assertEqual(3, cwriter.bytes_written)
318             # Don't destroy the cache, and start another upload
319             cwriter_new = arv_put.ArvPutUploadJob([f.name])
320             cwriter_new.start(save_collection=False)
321             cwriter_new.destroy_cache()
322             self.assertEqual(3, cwriter_new.bytes_skipped)
323             self.assertEqual(3, cwriter_new.bytes_written)
324
325     def make_progress_tester(self):
326         progression = []
327         def record_func(written, expected):
328             progression.append((written, expected))
329         return progression, record_func
330
331     def test_progress_reporting(self):
332         with tempfile.NamedTemporaryFile() as f:
333             f.write(b'foo')
334             f.flush()
335             for expect_count in (None, 8):
336                 progression, reporter = self.make_progress_tester()
337                 cwriter = arv_put.ArvPutUploadJob([f.name],
338                                                   reporter=reporter)
339                 cwriter.bytes_expected = expect_count
340                 cwriter.start(save_collection=False)
341                 cwriter.destroy_cache()
342                 self.assertIn((3, expect_count), progression)
343
344     def test_writer_upload_directory(self):
345         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
346         cwriter.start(save_collection=False)
347         cwriter.destroy_cache()
348         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
349
350     def test_resume_large_file_upload(self):
351         def wrapped_write(*args, **kwargs):
352             data = args[1]
353             # Exit only on last block
354             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
355                 # Simulate a checkpoint before quitting. Ensure block commit.
356                 self.writer._update(final=True)
357                 raise SystemExit("Simulated error")
358             return self.arvfile_write(*args, **kwargs)
359
360         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
361                         autospec=True) as mocked_write:
362             mocked_write.side_effect = wrapped_write
363             writer = arv_put.ArvPutUploadJob([self.large_file_name],
364                                              replication_desired=1)
365             # We'll be accessing from inside the wrapper
366             self.writer = writer
367             with self.assertRaises(SystemExit):
368                 writer.start(save_collection=False)
369             # Confirm that the file was partially uploaded
370             self.assertGreater(writer.bytes_written, 0)
371             self.assertLess(writer.bytes_written,
372                             os.path.getsize(self.large_file_name))
373         # Retry the upload
374         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
375                                           replication_desired=1)
376         writer2.start(save_collection=False)
377         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
378                          os.path.getsize(self.large_file_name))
379         writer2.destroy_cache()
380         del(self.writer)
381
382     # Test for bug #11002
383     def test_graceful_exit_while_repacking_small_blocks(self):
384         def wrapped_commit(*args, **kwargs):
385             raise SystemExit("Simulated error")
386
387         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
388                         autospec=True) as mocked_commit:
389             mocked_commit.side_effect = wrapped_commit
390             # Upload a little more than 1 block, wrapped_commit will make the first block
391             # commit to fail.
392             # arv-put should not exit with an exception by trying to commit the collection
393             # as it's in an inconsistent state.
394             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
395                                              replication_desired=1)
396             try:
397                 with self.assertRaises(SystemExit):
398                     writer.start(save_collection=False)
399             except arvados.arvfile.UnownedBlockError:
400                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
401         writer.destroy_cache()
402
403     def test_no_resume_when_asked(self):
404         def wrapped_write(*args, **kwargs):
405             data = args[1]
406             # Exit only on last block
407             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
408                 # Simulate a checkpoint before quitting.
409                 self.writer._update()
410                 raise SystemExit("Simulated error")
411             return self.arvfile_write(*args, **kwargs)
412
413         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
414                         autospec=True) as mocked_write:
415             mocked_write.side_effect = wrapped_write
416             writer = arv_put.ArvPutUploadJob([self.large_file_name],
417                                              replication_desired=1)
418             # We'll be accessing from inside the wrapper
419             self.writer = writer
420             with self.assertRaises(SystemExit):
421                 writer.start(save_collection=False)
422             # Confirm that the file was partially uploaded
423             self.assertGreater(writer.bytes_written, 0)
424             self.assertLess(writer.bytes_written,
425                             os.path.getsize(self.large_file_name))
426         # Retry the upload, this time without resume
427         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
428                                           replication_desired=1,
429                                           resume=False)
430         writer2.start(save_collection=False)
431         self.assertEqual(writer2.bytes_skipped, 0)
432         self.assertEqual(writer2.bytes_written,
433                          os.path.getsize(self.large_file_name))
434         writer2.destroy_cache()
435         del(self.writer)
436
437     def test_no_resume_when_no_cache(self):
438         def wrapped_write(*args, **kwargs):
439             data = args[1]
440             # Exit only on last block
441             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
442                 # Simulate a checkpoint before quitting.
443                 self.writer._update()
444                 raise SystemExit("Simulated error")
445             return self.arvfile_write(*args, **kwargs)
446
447         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
448                         autospec=True) as mocked_write:
449             mocked_write.side_effect = wrapped_write
450             writer = arv_put.ArvPutUploadJob([self.large_file_name],
451                                              replication_desired=1)
452             # We'll be accessing from inside the wrapper
453             self.writer = writer
454             with self.assertRaises(SystemExit):
455                 writer.start(save_collection=False)
456             # Confirm that the file was partially uploaded
457             self.assertGreater(writer.bytes_written, 0)
458             self.assertLess(writer.bytes_written,
459                             os.path.getsize(self.large_file_name))
460         # Retry the upload, this time without cache usage
461         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
462                                           replication_desired=1,
463                                           resume=False,
464                                           use_cache=False)
465         writer2.start(save_collection=False)
466         self.assertEqual(writer2.bytes_skipped, 0)
467         self.assertEqual(writer2.bytes_written,
468                          os.path.getsize(self.large_file_name))
469         writer2.destroy_cache()
470         del(self.writer)
471
472     def test_dry_run_feature(self):
473         def wrapped_write(*args, **kwargs):
474             data = args[1]
475             # Exit only on last block
476             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
477                 # Simulate a checkpoint before quitting.
478                 self.writer._update()
479                 raise SystemExit("Simulated error")
480             return self.arvfile_write(*args, **kwargs)
481
482         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
483                         autospec=True) as mocked_write:
484             mocked_write.side_effect = wrapped_write
485             writer = arv_put.ArvPutUploadJob([self.large_file_name],
486                                              replication_desired=1)
487             # We'll be accessing from inside the wrapper
488             self.writer = writer
489             with self.assertRaises(SystemExit):
490                 writer.start(save_collection=False)
491             # Confirm that the file was partially uploaded
492             self.assertGreater(writer.bytes_written, 0)
493             self.assertLess(writer.bytes_written,
494                             os.path.getsize(self.large_file_name))
495         with self.assertRaises(arv_put.ArvPutUploadIsPending):
496             # Retry the upload using dry_run to check if there is a pending upload
497             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
498                                               replication_desired=1,
499                                               dry_run=True)
500         # Complete the pending upload
501         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
502                                           replication_desired=1)
503         writer3.start(save_collection=False)
504         with self.assertRaises(arv_put.ArvPutUploadNotPending):
505             # Confirm there's no pending upload with dry_run=True
506             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
507                                               replication_desired=1,
508                                               dry_run=True)
509         # Test obvious cases
510         with self.assertRaises(arv_put.ArvPutUploadIsPending):
511             arv_put.ArvPutUploadJob([self.large_file_name],
512                                     replication_desired=1,
513                                     dry_run=True,
514                                     resume=False,
515                                     use_cache=False)
516         with self.assertRaises(arv_put.ArvPutUploadIsPending):
517             arv_put.ArvPutUploadJob([self.large_file_name],
518                                     replication_desired=1,
519                                     dry_run=True,
520                                     resume=False)
521         del(self.writer)
522
523 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
524     TEST_SIZE = os.path.getsize(__file__)
525
526     def test_expected_bytes_for_file(self):
527         writer = arv_put.ArvPutUploadJob([__file__])
528         self.assertEqual(self.TEST_SIZE,
529                          writer.bytes_expected)
530
531     def test_expected_bytes_for_tree(self):
532         tree = self.make_tmpdir()
533         shutil.copyfile(__file__, os.path.join(tree, 'one'))
534         shutil.copyfile(__file__, os.path.join(tree, 'two'))
535
536         writer = arv_put.ArvPutUploadJob([tree])
537         self.assertEqual(self.TEST_SIZE * 2,
538                          writer.bytes_expected)
539         writer = arv_put.ArvPutUploadJob([tree, __file__])
540         self.assertEqual(self.TEST_SIZE * 3,
541                          writer.bytes_expected)
542
543     def test_expected_bytes_for_device(self):
544         writer = arv_put.ArvPutUploadJob(['/dev/null'])
545         self.assertIsNone(writer.bytes_expected)
546         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
547         self.assertIsNone(writer.bytes_expected)
548
549
550 class ArvadosPutReportTest(ArvadosBaseTestCase):
551     def test_machine_progress(self):
552         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
553             expect = ": {} written {} total\n".format(
554                 count, -1 if (total is None) else total)
555             self.assertTrue(
556                 arv_put.machine_progress(count, total).endswith(expect))
557
558     def test_known_human_progress(self):
559         for count, total in [(0, 1), (2, 4), (45, 60)]:
560             expect = '{:.1%}'.format(1.0*count/total)
561             actual = arv_put.human_progress(count, total)
562             self.assertTrue(actual.startswith('\r'))
563             self.assertIn(expect, actual)
564
565     def test_unknown_human_progress(self):
566         for count in [1, 20, 300, 4000, 50000]:
567             self.assertTrue(re.search(r'\b{}\b'.format(count),
568                                       arv_put.human_progress(count, None)))
569
570
571 class ArvadosPutTest(run_test_server.TestCaseWithServers,
572                      ArvadosBaseTestCase,
573                      tutil.VersionChecker):
574     MAIN_SERVER = {}
575     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
576
577     def call_main_with_args(self, args):
578         self.main_stdout = tutil.StringIO()
579         self.main_stderr = tutil.StringIO()
580         return arv_put.main(args, self.main_stdout, self.main_stderr)
581
582     def call_main_on_test_file(self, args=[]):
583         with self.make_test_file() as testfile:
584             path = testfile.name
585             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
586         self.assertTrue(
587             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
588                                         '098f6bcd4621d373cade4e832627b4f6')),
589             "did not find file stream in Keep store")
590
591     def setUp(self):
592         super(ArvadosPutTest, self).setUp()
593         run_test_server.authorize_with('active')
594         arv_put.api_client = None
595
596     def tearDown(self):
597         for outbuf in ['main_stdout', 'main_stderr']:
598             if hasattr(self, outbuf):
599                 getattr(self, outbuf).close()
600                 delattr(self, outbuf)
601         super(ArvadosPutTest, self).tearDown()
602
603     def test_version_argument(self):
604         with tutil.redirected_streams(
605                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
606             with self.assertRaises(SystemExit):
607                 self.call_main_with_args(['--version'])
608         self.assertVersionOutput(out, err)
609
610     def test_simple_file_put(self):
611         self.call_main_on_test_file()
612
613     def test_put_with_unwriteable_cache_dir(self):
614         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
615         cachedir = self.make_tmpdir()
616         os.chmod(cachedir, 0o0)
617         arv_put.ResumeCache.CACHE_DIR = cachedir
618         try:
619             self.call_main_on_test_file()
620         finally:
621             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
622             os.chmod(cachedir, 0o700)
623
624     def test_put_with_unwritable_cache_subdir(self):
625         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
626         cachedir = self.make_tmpdir()
627         os.chmod(cachedir, 0o0)
628         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
629         try:
630             self.call_main_on_test_file()
631         finally:
632             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
633             os.chmod(cachedir, 0o700)
634
635     def test_put_block_replication(self):
636         self.call_main_on_test_file()
637         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
638             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
639             self.call_main_on_test_file(['--replication', '1'])
640             self.call_main_on_test_file(['--replication', '4'])
641             self.call_main_on_test_file(['--replication', '5'])
642             self.assertEqual(
643                 [x[-1].get('copies') for x in put_mock.call_args_list],
644                 [1, 4, 5])
645
646     def test_normalize(self):
647         testfile1 = self.make_test_file()
648         testfile2 = self.make_test_file()
649         test_paths = [testfile1.name, testfile2.name]
650         # Reverse-sort the paths, so normalization must change their order.
651         test_paths.sort(reverse=True)
652         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
653                                  test_paths)
654         manifest = self.main_stdout.getvalue()
655         # Assert the second file we specified appears first in the manifest.
656         file_indices = [manifest.find(':' + os.path.basename(path))
657                         for path in test_paths]
658         self.assertGreater(*file_indices)
659
660     def test_error_name_without_collection(self):
661         self.assertRaises(SystemExit, self.call_main_with_args,
662                           ['--name', 'test without Collection',
663                            '--stream', '/dev/null'])
664
665     def test_error_when_project_not_found(self):
666         self.assertRaises(SystemExit,
667                           self.call_main_with_args,
668                           ['--project-uuid', self.Z_UUID])
669
670     def test_error_bad_project_uuid(self):
671         self.assertRaises(SystemExit,
672                           self.call_main_with_args,
673                           ['--project-uuid', self.Z_UUID, '--stream'])
674
675     def test_error_when_excluding_absolute_path(self):
676         tmpdir = self.make_tmpdir()
677         self.assertRaises(SystemExit,
678                           self.call_main_with_args,
679                           ['--exclude', '/some/absolute/path/*',
680                            tmpdir])
681
682     def test_api_error_handling(self):
683         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
684         coll_save_mock.side_effect = arvados.errors.ApiError(
685             fake_httplib2_response(403), b'{}')
686         with mock.patch('arvados.collection.Collection.save_new',
687                         new=coll_save_mock):
688             with self.assertRaises(SystemExit) as exc_test:
689                 self.call_main_with_args(['/dev/null'])
690             self.assertLess(0, exc_test.exception.args[0])
691             self.assertLess(0, coll_save_mock.call_count)
692             self.assertEqual("", self.main_stdout.getvalue())
693
694
695 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
696                             ArvadosBaseTestCase):
697     def _getKeepServerConfig():
698         for config_file, mandatory in [
699                 ['application.yml', False], ['application.default.yml', True]]:
700             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
701                                 "api", "config", config_file)
702             if not mandatory and not os.path.exists(path):
703                 continue
704             with open(path) as f:
705                 rails_config = yaml.load(f.read())
706                 for config_section in ['test', 'common']:
707                     try:
708                         key = rails_config[config_section]["blob_signing_key"]
709                     except (KeyError, TypeError):
710                         pass
711                     else:
712                         return {'blob_signing_key': key,
713                                 'enforce_permissions': True}
714         return {'blog_signing_key': None, 'enforce_permissions': False}
715
716     MAIN_SERVER = {}
717     KEEP_SERVER = _getKeepServerConfig()
718     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
719
720     @classmethod
721     def setUpClass(cls):
722         super(ArvPutIntegrationTest, cls).setUpClass()
723         cls.ENVIRON = os.environ.copy()
724         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
725
726     def setUp(self):
727         super(ArvPutIntegrationTest, self).setUp()
728         arv_put.api_client = None
729
730     def authorize_with(self, token_name):
731         run_test_server.authorize_with(token_name)
732         for v in ["ARVADOS_API_HOST",
733                   "ARVADOS_API_HOST_INSECURE",
734                   "ARVADOS_API_TOKEN"]:
735             self.ENVIRON[v] = arvados.config.settings()[v]
736         arv_put.api_client = arvados.api('v1')
737
738     def current_user(self):
739         return arv_put.api_client.users().current().execute()
740
741     def test_check_real_project_found(self):
742         self.authorize_with('active')
743         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
744                         "did not correctly find test fixture project")
745
746     def test_check_error_finding_nonexistent_uuid(self):
747         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
748         self.authorize_with('active')
749         try:
750             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
751                                                   0)
752         except ValueError as error:
753             self.assertIn(BAD_UUID, str(error))
754         else:
755             self.assertFalse(result, "incorrectly found nonexistent project")
756
757     def test_check_error_finding_nonexistent_project(self):
758         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
759         self.authorize_with('active')
760         with self.assertRaises(apiclient.errors.HttpError):
761             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
762                                                   0)
763
764     def test_short_put_from_stdin(self):
765         # Have to run this as an integration test since arv-put can't
766         # read from the tests' stdin.
767         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
768         # case, because the /proc entry is already gone by the time it tries.
769         pipe = subprocess.Popen(
770             [sys.executable, arv_put.__file__, '--stream'],
771             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
772             stderr=subprocess.STDOUT, env=self.ENVIRON)
773         pipe.stdin.write(b'stdin test\n')
774         pipe.stdin.close()
775         deadline = time.time() + 5
776         while (pipe.poll() is None) and (time.time() < deadline):
777             time.sleep(.1)
778         returncode = pipe.poll()
779         if returncode is None:
780             pipe.terminate()
781             self.fail("arv-put did not PUT from stdin within 5 seconds")
782         elif returncode != 0:
783             sys.stdout.write(pipe.stdout.read())
784             self.fail("arv-put returned exit code {}".format(returncode))
785         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
786                       pipe.stdout.read().decode())
787
788     def test_ArvPutSignedManifest(self):
789         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
790         # the newly created manifest from the API server, testing to confirm
791         # that the block locators in the returned manifest are signed.
792         self.authorize_with('active')
793
794         # Before doing anything, demonstrate that the collection
795         # we're about to create is not present in our test fixture.
796         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
797         with self.assertRaises(apiclient.errors.HttpError):
798             notfound = arv_put.api_client.collections().get(
799                 uuid=manifest_uuid).execute()
800
801         datadir = self.make_tmpdir()
802         with open(os.path.join(datadir, "foo"), "w") as f:
803             f.write("The quick brown fox jumped over the lazy dog")
804         p = subprocess.Popen([sys.executable, arv_put.__file__,
805                               os.path.join(datadir, 'foo')],
806                              stdout=subprocess.PIPE,
807                              stderr=subprocess.PIPE,
808                              env=self.ENVIRON)
809         (out, err) = p.communicate()
810         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
811         self.assertEqual(p.returncode, 0)
812
813         # The manifest text stored in the API server under the same
814         # manifest UUID must use signed locators.
815         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
816         self.assertRegex(
817             c['manifest_text'],
818             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
819
820         os.remove(os.path.join(datadir, "foo"))
821         os.rmdir(datadir)
822
823     def run_and_find_collection(self, text, extra_args=[]):
824         self.authorize_with('active')
825         pipe = subprocess.Popen(
826             [sys.executable, arv_put.__file__] + extra_args,
827             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
828             stderr=subprocess.PIPE, env=self.ENVIRON)
829         stdout, stderr = pipe.communicate(text.encode())
830         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
831         search_key = ('portable_data_hash'
832                       if '--portable-data-hash' in extra_args else 'uuid')
833         collection_list = arvados.api('v1').collections().list(
834             filters=[[search_key, '=', stdout.decode().strip()]]
835         ).execute().get('items', [])
836         self.assertEqual(1, len(collection_list))
837         return collection_list[0]
838
839     def test_put_collection_with_later_update(self):
840         tmpdir = self.make_tmpdir()
841         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
842             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
843         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
844         self.assertNotEqual(None, col['uuid'])
845         # Add a new file to the directory
846         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
847             f.write('The quick brown fox jumped over the lazy dog')
848         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
849         self.assertEqual(col['uuid'], updated_col['uuid'])
850         # Get the manifest and check that the new file is being included
851         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
852         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
853
854     def test_upload_directory_reference_without_trailing_slash(self):
855         tmpdir1 = self.make_tmpdir()
856         tmpdir2 = self.make_tmpdir()
857         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
858             f.write('This is foo')
859         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
860             f.write('This is not foo')
861         # Upload one directory and one file
862         col = self.run_and_find_collection("", ['--no-progress',
863                                                 tmpdir1,
864                                                 os.path.join(tmpdir2, 'bar')])
865         self.assertNotEqual(None, col['uuid'])
866         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
867         # Check that 'foo' was written inside a subcollection
868         # OTOH, 'bar' should have been directly uploaded on the root collection
869         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
870
871     def test_upload_directory_reference_with_trailing_slash(self):
872         tmpdir1 = self.make_tmpdir()
873         tmpdir2 = self.make_tmpdir()
874         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
875             f.write('This is foo')
876         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
877             f.write('This is not foo')
878         # Upload one directory (with trailing slash) and one file
879         col = self.run_and_find_collection("", ['--no-progress',
880                                                 tmpdir1 + os.sep,
881                                                 os.path.join(tmpdir2, 'bar')])
882         self.assertNotEqual(None, col['uuid'])
883         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
884         # Check that 'foo' and 'bar' were written at the same level
885         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
886
887     def test_put_collection_with_high_redundancy(self):
888         # Write empty data: we're not testing CollectionWriter, just
889         # making sure collections.create tells the API server what our
890         # desired replication level is.
891         collection = self.run_and_find_collection("", ['--replication', '4'])
892         self.assertEqual(4, collection['replication_desired'])
893
894     def test_put_collection_with_default_redundancy(self):
895         collection = self.run_and_find_collection("")
896         self.assertEqual(None, collection['replication_desired'])
897
898     def test_put_collection_with_unnamed_project_link(self):
899         link = self.run_and_find_collection(
900             "Test unnamed collection",
901             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
902         username = pwd.getpwuid(os.getuid()).pw_name
903         self.assertRegex(
904             link['name'],
905             r'^Saved at .* by {}@'.format(re.escape(username)))
906
907     def test_put_collection_with_name_and_no_project(self):
908         link_name = 'Test Collection Link in home project'
909         collection = self.run_and_find_collection(
910             "Test named collection in home project",
911             ['--portable-data-hash', '--name', link_name])
912         self.assertEqual(link_name, collection['name'])
913         my_user_uuid = self.current_user()['uuid']
914         self.assertEqual(my_user_uuid, collection['owner_uuid'])
915
916     def test_put_collection_with_named_project_link(self):
917         link_name = 'Test auto Collection Link'
918         collection = self.run_and_find_collection("Test named collection",
919                                       ['--portable-data-hash',
920                                        '--name', link_name,
921                                        '--project-uuid', self.PROJECT_UUID])
922         self.assertEqual(link_name, collection['name'])
923
924     def test_exclude_filename_pattern(self):
925         tmpdir = self.make_tmpdir()
926         tmpsubdir = os.path.join(tmpdir, 'subdir')
927         os.mkdir(tmpsubdir)
928         for fname in ['file1', 'file2', 'file3']:
929             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
930                 f.write("This is %s" % fname)
931             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
932                 f.write("This is %s" % fname)
933         col = self.run_and_find_collection("", ['--no-progress',
934                                                 '--exclude', '*2.txt',
935                                                 '--exclude', 'file3.*',
936                                                  tmpdir])
937         self.assertNotEqual(None, col['uuid'])
938         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
939         # None of the file2.txt & file3.txt should have been uploaded
940         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
941         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
942         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
943
944     def test_exclude_filepath_pattern(self):
945         tmpdir = self.make_tmpdir()
946         tmpsubdir = os.path.join(tmpdir, 'subdir')
947         os.mkdir(tmpsubdir)
948         for fname in ['file1', 'file2', 'file3']:
949             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
950                 f.write("This is %s" % fname)
951             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
952                 f.write("This is %s" % fname)
953         col = self.run_and_find_collection("", ['--no-progress',
954                                                 '--exclude', 'subdir/*2.txt',
955                                                  tmpdir])
956         self.assertNotEqual(None, col['uuid'])
957         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
958         # Only tmpdir/file2.txt should have been uploaded
959         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
960         self.assertRegex(c['manifest_text'],
961                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
962         self.assertNotRegex(c['manifest_text'],
963                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
964         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
965
966
967 if __name__ == '__main__':
968     unittest.main()